Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
cb0af36
add langgraph plugin
brianstrauch Apr 14, 2026
8f30e56
add experimental package warnings
brianstrauch Apr 14, 2026
d1470c7
fix ruff lint
brianstrauch Apr 14, 2026
68a0eb4
fix pyright lint errors
brianstrauch Apr 14, 2026
cbd066a
fixed some mypy lints
brianstrauch Apr 14, 2026
5d1f182
fix docstring lints
brianstrauch Apr 14, 2026
ea24a13
copilot code review
brianstrauch Apr 14, 2026
19a5052
fix mypy lint
brianstrauch Apr 15, 2026
0201c89
separate graphs and entrypoints by task queue to avoid concurrent wri…
brianstrauch Apr 15, 2026
e98dd96
use graph.node or task_id for activity names to avoid collisions
brianstrauch Apr 15, 2026
e1a93fc
rm local conftest in favor of global and fix lint
brianstrauch Apr 15, 2026
0eeeb00
Merge branch 'main' into langgraph
brianstrauch Apr 15, 2026
58cbdc4
Merge branch 'main' into langgraph
brianstrauch Apr 16, 2026
7d925b0
allow langgraph 1.1
brianstrauch Apr 16, 2026
05bd7aa
uv lock
brianstrauch Apr 16, 2026
2253a5f
add default_activity_options
brianstrauch Apr 16, 2026
86837df
add replay test
brianstrauch Apr 16, 2026
dc2ed59
fix gaps in missing tests
brianstrauch Apr 16, 2026
c0525cc
introduce an interceptor to patch is_running only in the workflow, th…
brianstrauch Apr 17, 2026
b78d063
add interceptor
brianstrauch Apr 17, 2026
cabbb1a
Merge branch 'main' into langgraph
brianstrauch Apr 17, 2026
8ef609f
remove graph and entrypoint functions in favor of direct graph usage
brianstrauch Apr 17, 2026
ec8244c
rename cache() to get_cache()
brianstrauch Apr 17, 2026
7c54795
Merge branch 'main' into langgraph
brianstrauch Apr 17, 2026
1da61a8
remove interceptor
brianstrauch Apr 17, 2026
c84c22f
allow metadata to be accessed from node func and test
brianstrauch Apr 17, 2026
077452d
Fix import sorting in test_node_metadata.py
DABH Apr 18, 2026
7716e47
Fix formatting in langgraph_plugin.py
DABH Apr 18, 2026
30094dc
Fix mypy errors: add type params to StateGraph and use State() constr…
DABH Apr 18, 2026
01d18ed
Merge branch 'main' into langgraph
DABH Apr 18, 2026
e0d766c
Fix langsmith sandbox crash when langchain_core is installed
DABH Apr 18, 2026
38f2a18
Suppress basedpyright unused import warning for langchain_core preload
DABH Apr 19, 2026
aba75a3
Skip langgraph async tests on Python < 3.11 and warn plugin users
DABH Apr 19, 2026
6f19fa8
Remove duplicate pytest import in test_interrupt.py
DABH Apr 19, 2026
567f423
Fix basedpyright reportUnreachable warning on version check
DABH Apr 19, 2026
3191912
Increase execution_timeout for OpenAI tests that call the real API
DABH Apr 19, 2026
d13d912
Revert "allow metadata to be accessed from node func and test"
brianstrauch Apr 20, 2026
63e3d0c
Revert "rename cache() to get_cache()"
brianstrauch Apr 20, 2026
21acfea
Revert "remove graph and entrypoint functions in favor of direct grap…
brianstrauch Apr 20, 2026
4eb67e5
reimplement node metadata fixes
brianstrauch Apr 20, 2026
f62bc09
scope graphs and entrypoints to workflow, rename files
brianstrauch Apr 20, 2026
aa84892
test sync nodes and tasks, send
brianstrauch Apr 21, 2026
4b915b1
support command goto/update
brianstrauch Apr 21, 2026
c36423b
add test for command
brianstrauch Apr 21, 2026
04b56e7
raise error if node or task has a retry policy
brianstrauch Apr 21, 2026
ef96ba4
support runtime context
brianstrauch Apr 21, 2026
b742951
fix lint
brianstrauch Apr 22, 2026
5ecd3b1
Merge branch 'main' into langgraph
brianstrauch Apr 22, 2026
f9c8322
Merge branch 'main' into langgraph
brianstrauch Apr 22, 2026
6135fe0
Revert changes to langsmith test_integration.py
DABH Apr 23, 2026
65036a0
code review
brianstrauch Apr 23, 2026
44c7b88
Remove langchain_core from LangSmith plugin sandbox passthroughs (CI …
DABH Apr 23, 2026
d9187c8
Restore langchain_core to LangSmith plugin sandbox passthroughs
DABH Apr 23, 2026
1f5b5e3
Merge branch 'main' into langgraph
DABH Apr 23, 2026
872adf1
underscore py files in langgraph plugin dir
brianstrauch Apr 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"]
pydantic = ["pydantic>=2.0.0,<3"]
openai-agents = ["openai-agents>=0.14.0", "mcp>=1.9.4, <2"]
google-adk = ["google-adk>=1.27.0,<2"]
langgraph = ["langgraph>=1.1.0"]
langsmith = ["langsmith>=0.7.0,<0.8"]
lambda-worker-otel = [
"opentelemetry-api>=1.11.1,<2",
Expand Down Expand Up @@ -79,6 +80,7 @@ dev = [
"pytest-rerunfailures>=16.1",
"pytest-xdist>=3.6,<4",
"moto[s3,server]>=5",
"langgraph>=1.1.0",
"langsmith>=0.7.0,<0.8",
"setuptools<82",
"opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2",
Expand Down
156 changes: 156 additions & 0 deletions temporalio/contrib/langgraph/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# LangGraph Plugin for Temporal Python SDK

⚠️ **This package is currently at an experimental release stage.** ⚠️

This Temporal [Plugin](https://docs.temporal.io/develop/plugins-guide) allows you to run [LangGraph](https://www.langchain.com/langgraph) nodes and tasks as Temporal Activities, giving your AI workflows durable execution, automatic retries, and timeouts. It supports both the LangGraph Graph API (``StateGraph``) and Functional API (``@entrypoint`` / ``@task``).

## Installation

```sh
uv add temporalio[langgraph]
```

## Plugin Initialization

### Graph API

```python
from temporalio.contrib.langgraph import LangGraphPlugin

plugin = LangGraphPlugin(graphs={"my-graph": graph})
Comment thread
brianstrauch marked this conversation as resolved.
```

### Functional API

```python
import datetime
from temporalio.contrib.langgraph import LangGraphPlugin

plugin = LangGraphPlugin(
entrypoints={"my_entrypoint": my_entrypoint},
tasks=[my_task],
)
```

## Checkpointer

If your LangGraph code requires a checkpointer (for example, if you're using interrupts), use `InMemorySaver`.
Temporal handles durability, so third-party checkpointers (like PostgreSQL or Redis) are not needed.

```python
import langgraph.checkpoint.memory
import typing

from temporalio.contrib.langgraph import graph
from temporalio import workflow

@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, input: str) -> typing.Any:
g = graph("my-graph").compile(
checkpointer=langgraph.checkpoint.memory.InMemorySaver(),
)

...
```

## Activity Options

Options are passed through to [`workflow.execute_activity()`](https://python.temporal.io/temporalio.workflow.html#execute_activity), which supports parameters like `start_to_close_timeout`, `retry_policy`, `schedule_to_close_timeout`, `heartbeat_timeout`, and more.

### Graph API

Pass Activity options as node `metadata` when calling `add_node`:

```python
from datetime import timedelta
from temporalio.common import RetryPolicy

g = StateGraph(State)
g.add_node("my_node", my_node, metadata={
"start_to_close_timeout": timedelta(seconds=30),
"retry_policy": RetryPolicy(maximum_attempts=3),
})
```

### Functional API

Pass Activity options to the `LangGraphPlugin` constructor, keyed by task function name:

```python
from datetime import timedelta
from temporalio.common import RetryPolicy
from temporalio.contrib.langgraph import LangGraphPlugin

plugin = LangGraphPlugin(
entrypoints={"my_entrypoint": my_entrypoint},
tasks=[my_task],
activity_options={
"my_task": {
"start_to_close_timeout": timedelta(seconds=30),
"retry_policy": RetryPolicy(maximum_attempts=3),
},
},
)
```

### Runtime Context

LangGraph's run-scoped context (`context_schema`) is reconstructed on the Activity side, so nodes and tasks can read from and write to `runtime.context`:

```python
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

from temporalio.contrib.langgraph import graph

class Context(TypedDict):
user_id: str

async def my_node(state: State, runtime: Runtime[Context]) -> dict:
return {"user": runtime.context["user_id"]}

# In the Workflow:
g = graph("my-graph").compile()
await g.ainvoke({...}, context=Context(user_id="alice"))
```

Your `context` object must be serializable by the configured Temporal payload converter, since it crosses the Activity boundary.

## Stores are not supported

LangGraph's `Store` (e.g. `InMemoryStore` passed via `graph.compile(store=...)` or `@entrypoint(store=...)`) isn't accessible inside Activity-wrapped nodes: the Store holds live state that can't cross the Activity boundary, and Activities may run on a different worker than the Workflow. If you pass a store, the plugin logs a warning on first use and `runtime.store` is `None` inside nodes.

Use Workflow state for per-run memory, or an external database (Postgres/Redis/etc.) configured on each worker if you need shared memory across runs.

## Running in the Workflow

To run a node or task directly in the Workflow, set `execute_in` to `"workflow"`:

```python
# Graph API
graph.add_node("my_node", my_node, metadata={"execute_in": "workflow"})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to call out, in other integrations, this is the default and activities are done explicitly. Do we want to maintain that discrepancy?


# Functional API
plugin = LangGraphPlugin(
tasks=[my_task],
activity_options={"my_task": {"execute_in": "workflow"}},
)
```

## Running Tests

Install dependencies:

```sh
uv sync --all-extras
```

Run the test suite:

```sh
uv run pytest tests/contrib/langgraph
```

Tests start a local Temporal dev server automatically — no external server needed.
25 changes: 25 additions & 0 deletions temporalio/contrib/langgraph/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""LangGraph plugin for Temporal SDK.

.. warning::
This package is experimental and may change in future versions.
Use with caution in production environments.

This plugin runs `LangGraph <https://github.com/langchain-ai/langgraph>`_ nodes
and tasks as Temporal Activities, giving your AI agent workflows durable
execution, automatic retries, and timeouts. It supports both the LangGraph Graph
API (``StateGraph``) and Functional API (``@entrypoint`` / ``@task``).
"""

from temporalio.contrib.langgraph._plugin import (
LangGraphPlugin,
cache,
entrypoint,
graph,
)

__all__ = [
"LangGraphPlugin",
"entrypoint",
"cache",
"graph",
]
155 changes: 155 additions & 0 deletions temporalio/contrib/langgraph/_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""Activity wrappers for executing LangGraph nodes and tasks."""

from collections.abc import Awaitable
from dataclasses import dataclass
from inspect import iscoroutinefunction, signature
from typing import Any, Callable

from langgraph.errors import GraphInterrupt
from langgraph.types import Command, Interrupt

from temporalio import workflow
from temporalio.contrib.langgraph._langgraph_config import (
get_langgraph_config,
set_langgraph_config,
)
from temporalio.contrib.langgraph._task_cache import (
cache_key,
cache_lookup,
cache_put,
)

# Per-run dedupe so we only warn once when a user passes a Store via
# graph.compile(store=...) / @entrypoint(store=...). Cleared by
# LangGraphInterceptor.execute_workflow on workflow exit.
_warned_store_runs: set[str] = set()


def clear_store_warning(run_id: str) -> None:
"""Drop the store-warning dedupe entry for a workflow run."""
_warned_store_runs.discard(run_id)


@dataclass
class ActivityInput:
"""Input for a LangGraph activity, containing args, kwargs, and config."""

args: tuple[Any, ...]
kwargs: dict[str, Any]
langgraph_config: dict[str, Any]


@dataclass
class ActivityOutput:
"""Output from an Activity, containing result, command, or interrupts."""

result: Any = None
langgraph_command: Any = None
langgraph_interrupts: tuple[Interrupt] | None = None


def wrap_activity(
func: Callable,
) -> Callable[[ActivityInput], Awaitable[ActivityOutput]]:
"""Wrap a function as a Temporal activity that handles LangGraph config and interrupts."""
# Graph nodes declare `runtime: Runtime[Ctx]` in their signature; tasks
# don't and instead reach for Runtime via get_runtime(). We re-inject the
# reconstructed Runtime only when the user function asks.
accepts_runtime = "runtime" in signature(func).parameters

async def wrapper(input: ActivityInput) -> ActivityOutput:
runtime = set_langgraph_config(input.langgraph_config)
kwargs = dict(input.kwargs)
if accepts_runtime:
kwargs["runtime"] = runtime
try:
if iscoroutinefunction(func):
result = await func(*input.args, **kwargs)
else:
result = func(*input.args, **kwargs)
if isinstance(result, Command):
return ActivityOutput(langgraph_command=result)
return ActivityOutput(result=result)
except GraphInterrupt as e:
return ActivityOutput(langgraph_interrupts=e.args[0])

return wrapper


def wrap_execute_activity(
afunc: Callable[[ActivityInput], Awaitable[ActivityOutput]],
task_id: str = "",
**execute_activity_kwargs: Any,
) -> Callable[..., Any]:
"""Wrap an activity function to be called via workflow.execute_activity with caching."""

async def wrapper(*args: Any, **kwargs: Any) -> Any:
# LangGraph may inject a RunnableConfig as the 'config' kwarg. Strip it
# down to a serializable subset (metadata + tags) so it can cross the
# activity boundary; callbacks, stores, etc. aren't serializable.
if "config" in kwargs:
orig = kwargs["config"] or {}
kwargs["config"] = {
"metadata": dict(orig.get("metadata") or {}),
"tags": list(orig.get("tags") or []),
}

# LangGraph may inject a Runtime as the 'runtime' kwarg. It's
# reconstructed on the activity side from the serialized langgraph
# config, so drop the live Runtime from the kwargs that cross the
# activity boundary (it holds non-serializable stream_writer, store).
runtime = kwargs.pop("runtime", None)
run_id = workflow.info().run_id
if (
getattr(runtime, "store", None) is not None
and run_id not in _warned_store_runs
):
_warned_store_runs.add(run_id)
workflow.logger.warning(
"LangGraph Store passed via compile(store=...) / @entrypoint(store=...) "
"is not accessible inside activity-wrapped nodes and tasks: the Store "
"object isn't serializable across the activity boundary, and activities "
"may run on a different worker than the workflow. Use a backend-backed "
"store (Postgres/Redis) configured on each worker if you need shared "
"memory, or use workflow state for per-run memory."
)

langgraph_config = get_langgraph_config()

# Check task result cache (for continue-as-new deduplication).
key = (
cache_key(task_id, args, kwargs, langgraph_config.get("context"))
if task_id
else ""
)
if task_id:
found, cached = cache_lookup(key)
if found:
return cached

input = ActivityInput(
args=args, kwargs=kwargs, langgraph_config=langgraph_config
)
output = await workflow.execute_activity(
afunc, input, **execute_activity_kwargs
)
if output.langgraph_interrupts is not None:
raise GraphInterrupt(output.langgraph_interrupts)

result = output.result
if output.langgraph_command is not None:
cmd = output.langgraph_command
result = Command(
graph=cmd["graph"],
update=cmd["update"],
resume=cmd["resume"],
goto=cmd["goto"],
)

# Store in cache for future continue-as-new cycles.
if task_id:
cache_put(key, result)

return result

return wrapper
Loading
Loading