From bcf858245a9d187b209c15553863cc2b219ed19d Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 17 Apr 2026 17:59:06 -0700 Subject: [PATCH] Add SANO sample --- nexus_standalone_operations/README.md | 52 ++++++++++++ nexus_standalone_operations/__init__.py | 0 nexus_standalone_operations/handler.py | 42 ++++++++++ nexus_standalone_operations/service.py | 39 +++++++++ nexus_standalone_operations/starter.py | 74 +++++++++++++++++ nexus_standalone_operations/worker.py | 41 ++++++++++ pyproject.toml | 3 + tests/nexus_standalone_operations/__init__.py | 0 .../nexus_standalone_operations_test.py | 81 +++++++++++++++++++ uv.lock | 16 +--- 10 files changed, 336 insertions(+), 12 deletions(-) create mode 100644 nexus_standalone_operations/README.md create mode 100644 nexus_standalone_operations/__init__.py create mode 100644 nexus_standalone_operations/handler.py create mode 100644 nexus_standalone_operations/service.py create mode 100644 nexus_standalone_operations/starter.py create mode 100644 nexus_standalone_operations/worker.py create mode 100644 tests/nexus_standalone_operations/__init__.py create mode 100644 tests/nexus_standalone_operations/nexus_standalone_operations_test.py diff --git a/nexus_standalone_operations/README.md b/nexus_standalone_operations/README.md new file mode 100644 index 00000000..0a776636 --- /dev/null +++ b/nexus_standalone_operations/README.md @@ -0,0 +1,52 @@ +This sample demonstrates how to execute Nexus operations directly from client code, +without wrapping them in a workflow. It shows both synchronous and asynchronous +(workflow-backed) operations, plus listing and counting operations. + +## Note: Standalone Nexus operations require a server version that supports this feature. + +### Sample directory structure + +- [service.py](./service.py) - Nexus service definition with echo (sync) and hello (async) operations +- [handler.py](./handler.py) - Nexus operation handlers and the backing workflow for the async operation +- [worker.py](./worker.py) - Temporal worker that hosts the Nexus service +- [starter.py](./starter.py) - Client that executes standalone Nexus operations + + +### Instructions + +Start a Temporal server. (See the main samples repo [README](../README.md)). + +Create the Nexus endpoint: + +``` +temporal operator nexus endpoint create \ + --name nexus-standalone-operations-endpoint \ + --target-namespace default \ + --target-task-queue nexus-standalone-operations +``` + +In one terminal, start the worker: +``` +uv run nexus_standalone_operations/worker.py +``` + +In another terminal, run the starter: +``` +uv run nexus_standalone_operations/starter.py +``` + +### Expected output + +``` +Echo result: hello +Hello result: Hello, World! + +Listing Nexus operations: + OperationId: echo-..., Operation: echo, Status: COMPLETED + OperationId: hello-..., Operation: hello, Status: COMPLETED + +Total Nexus operations: 2 +``` + +If you run the starter code multiple times, you should see additional operations in the listing results, as more operations are run. +The same goes for the total number of operations. \ No newline at end of file diff --git a/nexus_standalone_operations/__init__.py b/nexus_standalone_operations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_standalone_operations/handler.py b/nexus_standalone_operations/handler.py new file mode 100644 index 00000000..af867058 --- /dev/null +++ b/nexus_standalone_operations/handler.py @@ -0,0 +1,42 @@ +"""Nexus service handler and backing workflow for standalone operations sample.""" + +from __future__ import annotations + +import uuid + +import nexusrpc.handler +from temporalio import nexus, workflow + +from nexus_standalone_operations.service import ( + EchoInput, + EchoOutput, + HelloInput, + HelloOutput, + MyNexusService, +) + + +@workflow.defn +class HelloWorkflow: + @workflow.run + async def run(self, input: HelloInput) -> HelloOutput: + return HelloOutput(greeting=f"Hello, {input.name}!") + + +@nexusrpc.handler.service_handler(service=MyNexusService) +class MyNexusServiceHandler: + @nexusrpc.handler.sync_operation + async def echo( + self, _ctx: nexusrpc.handler.StartOperationContext, input: EchoInput + ) -> EchoOutput: + return EchoOutput(message=input.message) + + @nexus.workflow_run_operation + async def hello( + self, ctx: nexus.WorkflowRunOperationContext, input: HelloInput + ) -> nexus.WorkflowHandle[HelloOutput]: + return await ctx.start_workflow( + HelloWorkflow.run, + input, + id=str(uuid.uuid4()), + ) diff --git a/nexus_standalone_operations/service.py b/nexus_standalone_operations/service.py new file mode 100644 index 00000000..a8a906db --- /dev/null +++ b/nexus_standalone_operations/service.py @@ -0,0 +1,39 @@ +"""Nexus service definition for standalone operations sample. + +Defines a Nexus service with two operations: +- echo: a synchronous operation that echoes the input message +- hello: an asynchronous (workflow-backed) operation that returns a greeting + +This service definition is used by both the handler (to validate operation +signatures) and the client (to create type-safe nexus clients). +""" + +from dataclasses import dataclass + +import nexusrpc + + +@dataclass +class EchoInput: + message: str + + +@dataclass +class EchoOutput: + message: str + + +@dataclass +class HelloInput: + name: str + + +@dataclass +class HelloOutput: + greeting: str + + +@nexusrpc.service +class MyNexusService: + echo: nexusrpc.Operation[EchoInput, EchoOutput] + hello: nexusrpc.Operation[HelloInput, HelloOutput] diff --git a/nexus_standalone_operations/starter.py b/nexus_standalone_operations/starter.py new file mode 100644 index 00000000..527cfdd3 --- /dev/null +++ b/nexus_standalone_operations/starter.py @@ -0,0 +1,74 @@ +"""Starter that demonstrates standalone Nexus operation execution. + +Unlike other Nexus samples that call operations from within a workflow, this +sample executes Nexus operations directly from client code using the standalone +Nexus operation APIs. +""" + +import asyncio +import uuid +from datetime import timedelta + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig + +from nexus_standalone_operations.service import ( + EchoInput, + HelloInput, + MyNexusService, +) + +ENDPOINT_NAME = "nexus-standalone-operations-endpoint" + + +async def main() -> None: + config = ClientConfig.load_client_connect_config() + _ = config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + # Create a typed NexusClient bound to the endpoint and service. + # The endpoint must be pre-created on the server (see README). + nexus_client = client.create_nexus_client( + service=MyNexusService, endpoint=ENDPOINT_NAME + ) + + # Start sync echo operation and await the result immediately. + echo_result = await nexus_client.execute_operation( + MyNexusService.echo, + EchoInput(message="hello"), + id=f"echo-{uuid.uuid4()}", + schedule_to_close_timeout=timedelta(seconds=10), + ) + print(f"Echo result: {echo_result.message}") + + # Start async (workflow-backed) hello operation and get a NexusOperationHandle. + handle = await nexus_client.start_operation( + MyNexusService.hello, + HelloInput(name="World"), + id=f"hello-{uuid.uuid4()}", + schedule_to_close_timeout=timedelta(seconds=10), + ) + + print(f"\nStarted `MyNexusService.Hello`. OperationID: {handle.operation_id}") + + # Use the NexusOperationHandle to await the result of the operation. + hello_result = await handle.result() + print(f"`MyNexusService.Hello` result: {hello_result.greeting}") + + # List nexus operations. + query = f'Endpoint = "{ENDPOINT_NAME}"' + print("\nListing Nexus operations:") + async for op in client.list_nexus_operations(query): + print( + f" OperationId: {op.operation_id},", + f" Operation: {op.operation},", + f" Status: {op.status.name}", + ) + + # Count nexus operations. + count = await client.count_nexus_operations(query) + print(f"\nTotal Nexus operations: {count.count}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/nexus_standalone_operations/worker.py b/nexus_standalone_operations/worker.py new file mode 100644 index 00000000..0de4ac3b --- /dev/null +++ b/nexus_standalone_operations/worker.py @@ -0,0 +1,41 @@ +"""Worker that hosts the Nexus service for standalone operations sample.""" + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from nexus_standalone_operations.handler import HelloWorkflow, MyNexusServiceHandler + +interrupt_event = asyncio.Event() + +TASK_QUEUE = "nexus-standalone-operations" + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + + config = ClientConfig.load_client_connect_config() + _ = config.setdefault("target_host", "localhost:7233") + client = await Client.connect(**config) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[HelloWorkflow], + nexus_service_handlers=[MyNexusServiceHandler()], + ): + logging.info("Worker started, ctrl+c to exit") + _ = await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/pyproject.toml b/pyproject.toml index f65ff1c2..e627174e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -140,3 +140,6 @@ ignore_errors = true [[tool.mypy.overrides]] module = "opentelemetry.*" ignore_errors = true + +[tool.uv.sources] +temporalio = { git = "https://github.com/temporalio/sdk-python", branch = "amazzeo/sano" } diff --git a/tests/nexus_standalone_operations/__init__.py b/tests/nexus_standalone_operations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/nexus_standalone_operations/nexus_standalone_operations_test.py b/tests/nexus_standalone_operations/nexus_standalone_operations_test.py new file mode 100644 index 00000000..a8302b93 --- /dev/null +++ b/tests/nexus_standalone_operations/nexus_standalone_operations_test.py @@ -0,0 +1,81 @@ +import asyncio +import os +import uuid +from datetime import timedelta + +import pytest +from temporalio.client import Client, NexusOperationFailureError +from temporalio.service import RPCError +from temporalio.worker import Worker + +from nexus_standalone_operations.handler import HelloWorkflow, MyNexusServiceHandler +from nexus_standalone_operations.service import ( + EchoInput, + EchoOutput, + HelloInput, + HelloOutput, + MyNexusService, +) +from nexus_standalone_operations.worker import TASK_QUEUE +from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint + + +async def test_nexus_standalone_operations(client: Client): + if not os.getenv("ENABLE_STANDALONE_NEXUS_TESTS"): + pytest.skip( + "Standalone Nexus operations not yet supported by default dev server. Set ENABLE_STANDALONE_NEXUS_TESTS=1 to enable." + ) + + endpoint_name = f"test-nexus-standalone-{uuid.uuid4()}" + + create_response = await create_nexus_endpoint( + name=endpoint_name, + task_queue=TASK_QUEUE, + client=client, + ) + try: + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[HelloWorkflow], + nexus_service_handlers=[MyNexusServiceHandler()], + ): + nexus_client = client.create_nexus_client( + service=MyNexusService, endpoint=endpoint_name + ) + + # Test sync echo operation (with retry for endpoint propagation) + echo_result = None + for _ in range(30): + try: + echo_result = await nexus_client.execute_operation( + MyNexusService.echo, + EchoInput(message="test-echo"), + id=str(uuid.uuid4()), + schedule_to_close_timeout=timedelta(seconds=10), + ) + break + except (RPCError, NexusOperationFailureError): + await asyncio.sleep(0.5) + assert isinstance(echo_result, EchoOutput) + assert echo_result.message == "test-echo" + + # Test async hello operation + hello_result = await nexus_client.execute_operation( + MyNexusService.hello, + HelloInput(name="Test"), + id=str(uuid.uuid4()), + schedule_to_close_timeout=timedelta(seconds=10), + ) + assert isinstance(hello_result, HelloOutput) + assert hello_result.greeting == "Hello, Test!" + + # Test count operations + count = await client.count_nexus_operations(f'Endpoint = "{endpoint_name}"') + assert count.count >= 0 + finally: + _ = await delete_nexus_endpoint( + id=create_response.endpoint.id, + version=create_response.endpoint.version, + client=client, + ) diff --git a/uv.lock b/uv.lock index 4a22edea..5e85e632 100644 --- a/uv.lock +++ b/uv.lock @@ -2752,7 +2752,7 @@ wheels = [ [[package]] name = "temporalio" version = "1.26.0" -source = { registry = "https://pypi.org/simple" } +source = { git = "https://github.com/temporalio/sdk-python?branch=amazzeo%2Fsano#325d8bca9805bb5b392c6c8e28bbf985a27ff5ac" } dependencies = [ { name = "nexus-rpc" }, { name = "protobuf" }, @@ -2760,14 +2760,6 @@ dependencies = [ { name = "types-protobuf" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ae/d4/fa21150a225393f87732ed6fef3cc9735d9e751edc6be415fe6e375105c6/temporalio-1.26.0.tar.gz", hash = "sha256:f4bfb35125e6f5e8c7f7ed1277c7354d812c6fac7ed5f8dbd50536cf289aaaa7", size = 2388994, upload-time = "2026-04-15T23:43:00.911Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/1e/27/8c421c622d18cc8e034247d5d72b89e6456937344b5bec1de40abef3c085/temporalio-1.26.0-cp310-abi3-macosx_10_12_x86_64.whl", hash = "sha256:5489040c0cf621edeb36984199dd9e4fbd2b3a07d61a4f2a8da1f2cb9820ef26", size = 14221070, upload-time = "2026-04-15T23:42:26.21Z" }, - { url = "https://files.pythonhosted.org/packages/49/7c/d2b691d16ec5db87198c2e08dbfba58e286c096faee15753613a581abdce/temporalio-1.26.0-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:b18dd85771509c19ef059a31908bcd4e6130d1f67037c4db519702f3f2ad6d4a", size = 13583991, upload-time = "2026-04-15T23:42:34.357Z" }, - { url = "https://files.pythonhosted.org/packages/05/ca/b8728451320ca9d8bb6e1680b9bd23767118f86d5b8644edf2304d533f1b/temporalio-1.26.0-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:46187d5f82ca2ae81f35ea5916a76db0e2f067210dc6b1852c3749475721946e", size = 13808036, upload-time = "2026-04-15T23:42:42.757Z" }, - { url = "https://files.pythonhosted.org/packages/cb/54/3113f5e0ac58655790abac64656373e06191b351d74bfb94692e81bd6784/temporalio-1.26.0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:03300c3e5237443367ac61bb20bd726c656b3daa50310bdd436599d5bdc7cf97", size = 14336604, upload-time = "2026-04-15T23:42:49.851Z" }, - { url = "https://files.pythonhosted.org/packages/fd/9b/c50840a26af3587c0c8d9af04d9976743e22496996dc1a377efc75dcd316/temporalio-1.26.0-cp310-abi3-win_amd64.whl", hash = "sha256:1c4a0d82f0a3796cbf78864c799f8dca0b94cdaec68e7b8b224c859005686ec4", size = 14525849, upload-time = "2026-04-15T23:42:57.589Z" }, -] [package.optional-dependencies] openai-agents = [ @@ -2853,7 +2845,7 @@ trio-async = [ ] [package.metadata] -requires-dist = [{ name = "temporalio", specifier = ">=1.26.0,<2" }] +requires-dist = [{ name = "temporalio", git = "https://github.com/temporalio/sdk-python?branch=amazzeo%2Fsano" }] [package.metadata.requires-dev] bedrock = [{ name = "boto3", specifier = ">=1.34.92,<2" }] @@ -2896,12 +2888,12 @@ langchain = [ nexus = [{ name = "nexus-rpc", specifier = ">=1.1.0,<2" }] open-telemetry = [ { name = "opentelemetry-exporter-otlp-proto-grpc" }, - { name = "temporalio", extras = ["opentelemetry"] }, + { name = "temporalio", extras = ["opentelemetry"], git = "https://github.com/temporalio/sdk-python?branch=amazzeo%2Fsano" }, ] openai-agents = [ { name = "openai-agents", extras = ["litellm"], specifier = ">=0.14.1" }, { name = "requests", specifier = ">=2.32.0,<3" }, - { name = "temporalio", extras = ["openai-agents", "opentelemetry"], specifier = ">=1.26.0" }, + { name = "temporalio", extras = ["openai-agents", "opentelemetry"], git = "https://github.com/temporalio/sdk-python?branch=amazzeo%2Fsano" }, ] pydantic-converter = [{ name = "pydantic", specifier = ">=2.10.6,<3" }] sentry = [{ name = "sentry-sdk", specifier = ">=2.13.0" }]