Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1a82c82
Add standalone Nexus client (start/describe/cancel/terminate/list/cou…
Evanthx May 19, 2026
ba629e4
Renaming handle classes
Evanthx May 19, 2026
ba6c27f
Removing the renamed classes
Evanthx May 19, 2026
352569d
Using endpoints from the rule instead of creating them
Evanthx May 19, 2026
67e6429
Moving common test code to SDKTestWorkflowRule
Evanthx May 19, 2026
9094bdc
Ensuring we are using an external server, as the internal one doesn't…
Evanthx May 20, 2026
6c52466
PR changes for comments
Evanthx May 27, 2026
399a111
Changing deadline behavior
Evanthx May 27, 2026
151b86e
Addressing PR comments
Evanthx May 27, 2026
467e2ae
Responding to PR comments
Evanthx May 27, 2026
e73b44a
Removed a file
Evanthx May 27, 2026
da11cb9
PR comments
Evanthx May 27, 2026
b76efba
PR responses
Evanthx May 28, 2026
4d08d5e
Updated exception handling
Evanthx May 28, 2026
0e69f85
Test updates
Evanthx May 28, 2026
dcc1d6c
Removing some timeouts
Evanthx May 28, 2026
74f4730
Refactor due to PR comment
Evanthx May 28, 2026
bcf23d0
More tests
Evanthx May 28, 2026
a94fb8c
Adding more tests
Evanthx May 28, 2026
afbf002
More checking in a test
Evanthx May 29, 2026
7f17bee
Updated a test
Evanthx May 29, 2026
8a3c2b8
Adding async tests
Evanthx May 29, 2026
8bc345e
Some improvements
Evanthx May 29, 2026
fc65dab
Adding tests
Evanthx May 29, 2026
ec9e103
Adding tests
Evanthx May 29, 2026
4ffa0cd
Don't run the tests against the in-memory server as it doesn't suppor…
Evanthx May 29, 2026
c308856
Adding prereq check for SANO
Evanthx May 29, 2026
46b973e
Checking for more SANO operations
Evanthx May 29, 2026
599a731
Test fix
Evanthx May 29, 2026
3d7e218
Restoring test
Evanthx May 29, 2026
35f0c2a
Now requiring UUID
Evanthx May 29, 2026
96e9abb
Some more checks on ID
Evanthx May 29, 2026
d3c18dd
Cleaned up some inputs
Evanthx May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public interface ActivityHandle<R> extends UntypedActivityHandle {

/**
* Blocks until the standalone activity completes and returns the typed result, or throws if the
* client-side timeout expires first.
*
* <p>client-side timeout expires first.
*
* @param timeout maximum time to wait
* @param unit unit of {@code timeout}
Expand Down
151 changes: 151 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/client/NexusClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package io.temporal.client;

import io.temporal.common.Experimental;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.lang.reflect.Type;
import java.util.stream.Stream;
import javax.annotation.Nullable;

/**
* Client for managing standalone Nexus operation executions. Obtain an instance via {@link
* #newInstance(WorkflowServiceStubs)} or {@link #newInstance(WorkflowServiceStubs,
* NexusClientOptions)}. Do not create this object per request; share it for the lifetime of the
* process.
*
* <p>Standalone Nexus operations run independently of any workflow — they are scheduled, monitored,
* and managed directly through this client (and the service-bound clients it produces) rather than
* from within a workflow execution.
*
* <p>To start operations, build a service-bound client and call {@code start}/{@code execute}:
*
* <pre>{@code
* NexusClient client = NexusClient.newInstance(stubs, options);
*
* // Typed: bind to an @ServiceInterface and invoke a method reference.
* NexusServiceClient<MyService> svc =
* NexusServiceClient.newInstance(MyService.class, "my-endpoint", stubs, options);
* String result = svc.execute(MyService::greet, "world");
*
* // Untyped: dispatch by operation name string.
* UntypedNexusServiceClient untyped =
* client.newUntypedNexusServiceClient("my-endpoint", "MyService");
* UntypedNexusOperationHandle handle = untyped.start("greet", null, "world");
* }</pre>
*
* <p>To act on an existing operation (describe, cancel, terminate, get result), obtain a handle via
* {@link #getHandle}:
*
* <pre>{@code
* NexusOperationHandle<String> handle = client.getHandle(operationId, runId, String.class);
* String result = handle.getResult();
* handle.cancel("user requested");
* }</pre>
*
* <p>For visibility queries across all operations in the namespace, see {@link
* #listNexusOperationExecutions} and {@link #countNexusOperationExecutions}.
*
* @see NexusServiceClient
* @see UntypedNexusServiceClient
* @see NexusOperationHandle
*/
@Experimental
public interface NexusClient {
Comment thread
Evanthx marked this conversation as resolved.

/**
* Creates a client with default {@link NexusClientOptions}.
*
* @param service gRPC stubs connected to a Temporal Service endpoint
*/
static NexusClient newInstance(WorkflowServiceStubs service) {
return NexusClientImpl.newInstance(service, NexusClientOptions.getDefaultInstance());
}

/**
* Creates a client with the supplied options.
*
* @param service gRPC stubs connected to a Temporal Service endpoint
* @param options namespace, data converter, interceptors, and defaults applied to operations
* started through this client
*/
static NexusClient newInstance(WorkflowServiceStubs service, NexusClientOptions options) {
return NexusClientImpl.newInstance(service, options);
}

/** Returns the underlying gRPC stubs this client routes RPCs through. */
WorkflowServiceStubs getWorkflowServiceStubs();

/**
* Returns an untyped handle to an existing operation execution, targeting the latest run. To bind
* a result type, wrap the handle with {@link NexusOperationHandle#fromUntyped}.
*
* @param operationId the user-assigned operation ID
* @return an untyped handle
*/
UntypedNexusOperationHandle getHandle(String operationId);

/**
* Returns an untyped handle to an existing operation execution, optionally pinned to a specific
* run.
*
* @param operationId the user-assigned operation ID
* @param runId the server-assigned run ID, or {@code null} to target the latest run
* @return an untyped handle
*/
UntypedNexusOperationHandle getHandle(String operationId, @Nullable String runId);

/**
* Returns a typed handle to an existing operation execution, bound to {@code resultClass}.
*
* @param operationId the user-assigned operation ID
* @param runId the server-assigned run ID, or {@code null} to target the latest run
* @param resultClass expected result type
* @param <R> result type
*/
<R> NexusOperationHandle<R> getHandle(
String operationId, @Nullable String runId, Class<R> resultClass);

/**
* Returns a typed handle to an existing operation execution, bound to {@code resultClass}/{@code
* resultType}. Use the {@code resultType} variant when the result is a generic type whose
* parameters cannot be captured by {@link Class} alone (e.g. {@code List<String>}).
*
* @param operationId the user-assigned operation ID
* @param runId the server-assigned run ID, or {@code null} to target the latest run
* @param resultClass expected result class
* @param resultType generic type for deserialization; may be {@code null}
* @param <R> result type
*/
<R> NexusOperationHandle<R> getHandle(
String operationId, @Nullable String runId, Class<R> resultClass, @Nullable Type resultType);

/**
* Builds an untyped service-bound client targeting the given endpoint and service. Use this to
* dispatch operations by name string when no service interface is available.
*
* @param endpoint Nexus endpoint name registered on the Temporal Service
* @param serviceName Nexus service name on that endpoint
*/
UntypedNexusServiceClient newUntypedNexusServiceClient(String endpoint, String serviceName);

/**
* Returns a stream of standalone Nexus operation executions matching the given visibility query.
* The stream paginates lazily over server-side results — pages are fetched on demand as the
* stream is consumed.
*
* @param query Temporal visibility query string, or {@code null} to return all executions in the
* client namespace
* @return a lazy stream of matching executions
*/
Stream<NexusOperationExecutionMetadata> listNexusOperationExecutions(@Nullable String query);

/**
* Returns the count of standalone Nexus operation executions matching the given visibility query,
* optionally with aggregation groups.
*
* @param query Temporal visibility query string, or {@code null} to count all executions in the
* client namespace
* @return execution count, optionally with aggregation groups when the query uses {@code GROUP
* BY}
*/
NexusOperationExecutionCount countNexusOperationExecutions(@Nullable String query);
}
154 changes: 154 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/client/NexusClientImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package io.temporal.client;

import static io.temporal.internal.WorkflowThreadMarker.enforceNonWorkflowThread;

import com.uber.m3.tally.Scope;
import io.temporal.common.Experimental;
import io.temporal.common.interceptors.NexusClientCallsInterceptor;
import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsInput;
import io.temporal.common.interceptors.NexusClientCallsInterceptor.CountNexusOperationExecutionsOutput;
import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsInput;
import io.temporal.common.interceptors.NexusClientCallsInterceptor.ListNexusOperationExecutionsOutput;
import io.temporal.common.interceptors.NexusClientInterceptor;
import io.temporal.internal.WorkflowThreadMarker;
import io.temporal.internal.client.NamespaceInjectWorkflowServiceStubs;
import io.temporal.internal.client.NexusOperationHandleImpl;
import io.temporal.internal.client.RootNexusClientInvoker;
import io.temporal.internal.client.external.GenericWorkflowClient;
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class NexusClientImpl implements NexusClient {

private static final Logger log = LoggerFactory.getLogger(NexusClientImpl.class);

private final WorkflowServiceStubs workflowServiceStubs;
private final NexusClientOptions options;
private final GenericWorkflowClient genericClient;
private final Scope metricsScope;
private final NexusClientCallsInterceptor nexusClientCallsInvoker;
private final List<NexusClientInterceptor> interceptors;

public static NexusClient newInstance(WorkflowServiceStubs service, NexusClientOptions options) {
enforceNonWorkflowThread();
return WorkflowThreadMarker.protectFromWorkflowThread(
new NexusClientImpl(service, options), NexusClient.class);
}

NexusClientImpl(WorkflowServiceStubs workflowServiceStubs, NexusClientOptions options) {
workflowServiceStubs =
new NamespaceInjectWorkflowServiceStubs(workflowServiceStubs, options.getNamespace());
this.workflowServiceStubs = workflowServiceStubs;
this.options = options;
this.metricsScope =
workflowServiceStubs
.getOptions()
.getMetricsScope()
.tagged(MetricsTag.defaultTags(options.getNamespace()));
this.genericClient = new GenericWorkflowClientImpl(workflowServiceStubs, metricsScope);
this.interceptors = options.getInterceptors();
this.nexusClientCallsInvoker = initializeClientInvoker();
if (log.isDebugEnabled()) {
log.debug(
"NexusClient initialized: namespace={}, interceptors={}",
options.getNamespace(),
interceptors.size());
}
}

private NexusClientCallsInterceptor initializeClientInvoker() {
NexusClientCallsInterceptor invoker = new RootNexusClientInvoker(genericClient, options);
for (NexusClientInterceptor clientInterceptor : interceptors) {
NexusClientCallsInterceptor wrapped = clientInterceptor.nexusClientCallsInterceptor(invoker);
if (wrapped == null) {
throw new IllegalStateException(
"NexusClientInterceptor "
+ clientInterceptor.getClass().getName()
+ " returned null from nexusClientCallsInterceptor; expected a non-null"
+ " NexusClientCallsInterceptor wrapping the supplied next link");
}
invoker = wrapped;
}
return invoker;
}

@Override
public WorkflowServiceStubs getWorkflowServiceStubs() {
return workflowServiceStubs;
}

@Override
public UntypedNexusOperationHandle getHandle(String operationId) {
return getHandle(operationId, null);
}

@Override
public UntypedNexusOperationHandle getHandle(String operationId, @Nullable String runId) {
return new NexusOperationHandleImpl(
operationId, runId, nexusClientCallsInvoker, options.getDataConverter());
}

@Override
public <R> NexusOperationHandle<R> getHandle(
String operationId, @Nullable String runId, Class<R> resultClass) {
return getHandle(operationId, runId, resultClass, null);
}

@Override
public <R> NexusOperationHandle<R> getHandle(
String operationId,
@Nullable String runId,
Class<R> resultClass,
@Nullable java.lang.reflect.Type resultType) {
return NexusOperationHandle.fromUntyped(getHandle(operationId, runId), resultClass, resultType);
}

@Override
public UntypedNexusServiceClient newUntypedNexusServiceClient(
String endpoint, String serviceName) {
return new UntypedNexusServiceClientImpl(
nexusClientCallsInvoker, endpoint, serviceName, options);
}

/**
* Returns the head of the interceptor chain. Package-private so service-client builders can route
* start RPCs through the chain without exposing it on the public {@link NexusClient} interface.
*/
NexusClientCallsInterceptor getNexusClientCallsInvoker() {
return nexusClientCallsInvoker;
}

@Override
public Stream<NexusOperationExecutionMetadata> listNexusOperationExecutions(
@Nullable String query) {
// Pagination is handled inside the interceptor invoker; we receive a fully materialized list
// and expose a Stream view of it to honour the public API contract.
ListNexusOperationExecutionsOutput out =
nexusClientCallsInvoker.listNexusOperationExecutions(
new ListNexusOperationExecutionsInput(query));
return out.getOperations().stream().map(NexusOperationExecutionMetadata::fromListInfo);
}

@Override
public NexusOperationExecutionCount countNexusOperationExecutions(@Nullable String query) {
CountNexusOperationExecutionsOutput out =
nexusClientCallsInvoker.countNexusOperationExecutions(
new CountNexusOperationExecutionsInput(query));
List<NexusOperationExecutionCount.AggregationGroup> publicGroups =
out.getGroups().stream()
.map(
g ->
new NexusOperationExecutionCount.AggregationGroup(
g.getCount(), g.getGroupValues()))
.collect(Collectors.toList());
return new NexusOperationExecutionCount(out.getCount(), publicGroups);
}
}
Loading
Loading