-
Notifications
You must be signed in to change notification settings - Fork 176
AI-36: Add LangGraph plugin #1448
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
brianstrauch
wants to merge
55
commits into
main
Choose a base branch
from
langgraph
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
55 commits
Select commit
Hold shift + click to select a range
cb0af36
add langgraph plugin
brianstrauch 8f30e56
add experimental package warnings
brianstrauch d1470c7
fix ruff lint
brianstrauch 68a0eb4
fix pyright lint errors
brianstrauch cbd066a
fixed some mypy lints
brianstrauch 5d1f182
fix docstring lints
brianstrauch ea24a13
copilot code review
brianstrauch 19a5052
fix mypy lint
brianstrauch 0201c89
separate graphs and entrypoints by task queue to avoid concurrent wri…
brianstrauch e98dd96
use graph.node or task_id for activity names to avoid collisions
brianstrauch e1a93fc
rm local conftest in favor of global and fix lint
brianstrauch 0eeeb00
Merge branch 'main' into langgraph
brianstrauch 58cbdc4
Merge branch 'main' into langgraph
brianstrauch 7d925b0
allow langgraph 1.1
brianstrauch 05bd7aa
uv lock
brianstrauch 2253a5f
add default_activity_options
brianstrauch 86837df
add replay test
brianstrauch dc2ed59
fix gaps in missing tests
brianstrauch c0525cc
introduce an interceptor to patch is_running only in the workflow, th…
brianstrauch b78d063
add interceptor
brianstrauch cabbb1a
Merge branch 'main' into langgraph
brianstrauch 8ef609f
remove graph and entrypoint functions in favor of direct graph usage
brianstrauch ec8244c
rename cache() to get_cache()
brianstrauch 7c54795
Merge branch 'main' into langgraph
brianstrauch 1da61a8
remove interceptor
brianstrauch c84c22f
allow metadata to be accessed from node func and test
brianstrauch 077452d
Fix import sorting in test_node_metadata.py
DABH 7716e47
Fix formatting in langgraph_plugin.py
DABH 30094dc
Fix mypy errors: add type params to StateGraph and use State() constr…
DABH 01d18ed
Merge branch 'main' into langgraph
DABH e0d766c
Fix langsmith sandbox crash when langchain_core is installed
DABH 38f2a18
Suppress basedpyright unused import warning for langchain_core preload
DABH aba75a3
Skip langgraph async tests on Python < 3.11 and warn plugin users
DABH 6f19fa8
Remove duplicate pytest import in test_interrupt.py
DABH 567f423
Fix basedpyright reportUnreachable warning on version check
DABH 3191912
Increase execution_timeout for OpenAI tests that call the real API
DABH d13d912
Revert "allow metadata to be accessed from node func and test"
brianstrauch 63e3d0c
Revert "rename cache() to get_cache()"
brianstrauch 21acfea
Revert "remove graph and entrypoint functions in favor of direct grap…
brianstrauch 4eb67e5
reimplement node metadata fixes
brianstrauch f62bc09
scope graphs and entrypoints to workflow, rename files
brianstrauch aa84892
test sync nodes and tasks, send
brianstrauch 4b915b1
support command goto/update
brianstrauch c36423b
add test for command
brianstrauch 04b56e7
raise error if node or task has a retry policy
brianstrauch ef96ba4
support runtime context
brianstrauch b742951
fix lint
brianstrauch 5ecd3b1
Merge branch 'main' into langgraph
brianstrauch f9c8322
Merge branch 'main' into langgraph
brianstrauch 6135fe0
Revert changes to langsmith test_integration.py
DABH 65036a0
code review
brianstrauch 44c7b88
Remove langchain_core from LangSmith plugin sandbox passthroughs (CI …
DABH d9187c8
Restore langchain_core to LangSmith plugin sandbox passthroughs
DABH 1f5b5e3
Merge branch 'main' into langgraph
DABH 872adf1
underscore py files in langgraph plugin dir
brianstrauch File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| # LangGraph Plugin for Temporal Python SDK | ||
|
|
||
| ⚠️ **This package is currently at an experimental release stage.** ⚠️ | ||
|
|
||
| This Temporal [Plugin](https://docs.temporal.io/develop/plugins-guide) allows you to run [LangGraph](https://www.langchain.com/langgraph) nodes and tasks as Temporal Activities, giving your AI workflows durable execution, automatic retries, and timeouts. It supports both the LangGraph Graph API (``StateGraph``) and Functional API (``@entrypoint`` / ``@task``). | ||
|
|
||
| ## Installation | ||
|
|
||
| ```sh | ||
| uv add temporalio[langgraph] | ||
| ``` | ||
|
|
||
| ## Plugin Initialization | ||
|
|
||
| ### Graph API | ||
|
|
||
| ```python | ||
| from temporalio.contrib.langgraph import LangGraphPlugin | ||
|
|
||
| plugin = LangGraphPlugin(graphs={"my-graph": graph}) | ||
| ``` | ||
|
|
||
| ### Functional API | ||
|
|
||
| ```python | ||
| import datetime | ||
| from temporalio.contrib.langgraph import LangGraphPlugin | ||
|
|
||
| plugin = LangGraphPlugin( | ||
| entrypoints={"my_entrypoint": my_entrypoint}, | ||
| tasks=[my_task], | ||
| ) | ||
| ``` | ||
|
|
||
| ## Checkpointer | ||
|
|
||
| If your LangGraph code requires a checkpointer (for example, if you're using interrupts), use `InMemorySaver`. | ||
| Temporal handles durability, so third-party checkpointers (like PostgreSQL or Redis) are not needed. | ||
|
|
||
| ```python | ||
| import langgraph.checkpoint.memory | ||
| import typing | ||
|
|
||
| from temporalio.contrib.langgraph import graph | ||
| from temporalio import workflow | ||
|
|
||
| @workflow.defn | ||
| class MyWorkflow: | ||
| @workflow.run | ||
| async def run(self, input: str) -> typing.Any: | ||
| g = graph("my-graph").compile( | ||
| checkpointer=langgraph.checkpoint.memory.InMemorySaver(), | ||
| ) | ||
|
|
||
| ... | ||
| ``` | ||
|
|
||
| ## Activity Options | ||
|
|
||
| Options are passed through to [`workflow.execute_activity()`](https://python.temporal.io/temporalio.workflow.html#execute_activity), which supports parameters like `start_to_close_timeout`, `retry_policy`, `schedule_to_close_timeout`, `heartbeat_timeout`, and more. | ||
|
|
||
| ### Graph API | ||
|
|
||
| Pass Activity options as node `metadata` when calling `add_node`: | ||
|
|
||
| ```python | ||
| from datetime import timedelta | ||
| from temporalio.common import RetryPolicy | ||
|
|
||
| g = StateGraph(State) | ||
| g.add_node("my_node", my_node, metadata={ | ||
| "start_to_close_timeout": timedelta(seconds=30), | ||
| "retry_policy": RetryPolicy(maximum_attempts=3), | ||
| }) | ||
| ``` | ||
|
|
||
| ### Functional API | ||
|
|
||
| Pass Activity options to the `LangGraphPlugin` constructor, keyed by task function name: | ||
|
|
||
| ```python | ||
| from datetime import timedelta | ||
| from temporalio.common import RetryPolicy | ||
| from temporalio.contrib.langgraph import LangGraphPlugin | ||
|
|
||
| plugin = LangGraphPlugin( | ||
| entrypoints={"my_entrypoint": my_entrypoint}, | ||
| tasks=[my_task], | ||
| activity_options={ | ||
| "my_task": { | ||
| "start_to_close_timeout": timedelta(seconds=30), | ||
| "retry_policy": RetryPolicy(maximum_attempts=3), | ||
| }, | ||
| }, | ||
| ) | ||
| ``` | ||
|
|
||
| ### Runtime Context | ||
|
|
||
| LangGraph's run-scoped context (`context_schema`) is reconstructed on the Activity side, so nodes and tasks can read from and write to `runtime.context`: | ||
|
|
||
| ```python | ||
| from langgraph.runtime import Runtime | ||
| from typing_extensions import TypedDict | ||
|
|
||
| from temporalio.contrib.langgraph import graph | ||
|
|
||
| class Context(TypedDict): | ||
| user_id: str | ||
|
|
||
| async def my_node(state: State, runtime: Runtime[Context]) -> dict: | ||
| return {"user": runtime.context["user_id"]} | ||
|
|
||
| # In the Workflow: | ||
| g = graph("my-graph").compile() | ||
| await g.ainvoke({...}, context=Context(user_id="alice")) | ||
| ``` | ||
|
|
||
| Your `context` object must be serializable by the configured Temporal payload converter, since it crosses the Activity boundary. | ||
|
|
||
| ## Stores are not supported | ||
|
|
||
| LangGraph's `Store` (e.g. `InMemoryStore` passed via `graph.compile(store=...)` or `@entrypoint(store=...)`) isn't accessible inside Activity-wrapped nodes: the Store holds live state that can't cross the Activity boundary, and Activities may run on a different worker than the Workflow. If you pass a store, the plugin logs a warning on first use and `runtime.store` is `None` inside nodes. | ||
|
|
||
| Use Workflow state for per-run memory, or an external database (Postgres/Redis/etc.) configured on each worker if you need shared memory across runs. | ||
|
|
||
| ## Running in the Workflow | ||
|
|
||
| To run a node or task directly in the Workflow, set `execute_in` to `"workflow"`: | ||
|
|
||
| ```python | ||
| # Graph API | ||
| graph.add_node("my_node", my_node, metadata={"execute_in": "workflow"}) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to call out, in other integrations, this is the default and activities are done explicitly. Do we want to maintain that discrepancy? |
||
|
|
||
| # Functional API | ||
| plugin = LangGraphPlugin( | ||
| tasks=[my_task], | ||
| activity_options={"my_task": {"execute_in": "workflow"}}, | ||
| ) | ||
| ``` | ||
|
|
||
| ## Running Tests | ||
|
|
||
| Install dependencies: | ||
|
|
||
| ```sh | ||
| uv sync --all-extras | ||
| ``` | ||
|
|
||
| Run the test suite: | ||
|
|
||
| ```sh | ||
| uv run pytest tests/contrib/langgraph | ||
| ``` | ||
|
|
||
| Tests start a local Temporal dev server automatically — no external server needed. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| """LangGraph plugin for Temporal SDK. | ||
|
|
||
| .. warning:: | ||
| This package is experimental and may change in future versions. | ||
| Use with caution in production environments. | ||
|
|
||
| This plugin runs `LangGraph <https://github.com/langchain-ai/langgraph>`_ nodes | ||
| and tasks as Temporal Activities, giving your AI agent workflows durable | ||
| execution, automatic retries, and timeouts. It supports both the LangGraph Graph | ||
| API (``StateGraph``) and Functional API (``@entrypoint`` / ``@task``). | ||
| """ | ||
|
|
||
| from temporalio.contrib.langgraph._plugin import ( | ||
| LangGraphPlugin, | ||
| cache, | ||
| entrypoint, | ||
| graph, | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| "LangGraphPlugin", | ||
| "entrypoint", | ||
| "cache", | ||
| "graph", | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,155 @@ | ||
| """Activity wrappers for executing LangGraph nodes and tasks.""" | ||
|
|
||
| from collections.abc import Awaitable | ||
| from dataclasses import dataclass | ||
| from inspect import iscoroutinefunction, signature | ||
| from typing import Any, Callable | ||
|
|
||
| from langgraph.errors import GraphInterrupt | ||
| from langgraph.types import Command, Interrupt | ||
|
|
||
| from temporalio import workflow | ||
| from temporalio.contrib.langgraph._langgraph_config import ( | ||
| get_langgraph_config, | ||
| set_langgraph_config, | ||
| ) | ||
| from temporalio.contrib.langgraph._task_cache import ( | ||
| cache_key, | ||
| cache_lookup, | ||
| cache_put, | ||
| ) | ||
|
|
||
| # Per-run dedupe so we only warn once when a user passes a Store via | ||
| # graph.compile(store=...) / @entrypoint(store=...). Cleared by | ||
| # LangGraphInterceptor.execute_workflow on workflow exit. | ||
| _warned_store_runs: set[str] = set() | ||
|
|
||
|
|
||
| def clear_store_warning(run_id: str) -> None: | ||
| """Drop the store-warning dedupe entry for a workflow run.""" | ||
| _warned_store_runs.discard(run_id) | ||
|
|
||
|
|
||
| @dataclass | ||
| class ActivityInput: | ||
| """Input for a LangGraph activity, containing args, kwargs, and config.""" | ||
|
|
||
| args: tuple[Any, ...] | ||
| kwargs: dict[str, Any] | ||
| langgraph_config: dict[str, Any] | ||
|
|
||
|
|
||
| @dataclass | ||
| class ActivityOutput: | ||
| """Output from an Activity, containing result, command, or interrupts.""" | ||
|
|
||
| result: Any = None | ||
| langgraph_command: Any = None | ||
| langgraph_interrupts: tuple[Interrupt] | None = None | ||
|
|
||
|
|
||
| def wrap_activity( | ||
| func: Callable, | ||
| ) -> Callable[[ActivityInput], Awaitable[ActivityOutput]]: | ||
| """Wrap a function as a Temporal activity that handles LangGraph config and interrupts.""" | ||
| # Graph nodes declare `runtime: Runtime[Ctx]` in their signature; tasks | ||
| # don't and instead reach for Runtime via get_runtime(). We re-inject the | ||
| # reconstructed Runtime only when the user function asks. | ||
| accepts_runtime = "runtime" in signature(func).parameters | ||
|
|
||
| async def wrapper(input: ActivityInput) -> ActivityOutput: | ||
| runtime = set_langgraph_config(input.langgraph_config) | ||
| kwargs = dict(input.kwargs) | ||
| if accepts_runtime: | ||
| kwargs["runtime"] = runtime | ||
| try: | ||
| if iscoroutinefunction(func): | ||
| result = await func(*input.args, **kwargs) | ||
| else: | ||
| result = func(*input.args, **kwargs) | ||
| if isinstance(result, Command): | ||
| return ActivityOutput(langgraph_command=result) | ||
| return ActivityOutput(result=result) | ||
| except GraphInterrupt as e: | ||
| return ActivityOutput(langgraph_interrupts=e.args[0]) | ||
|
|
||
| return wrapper | ||
|
|
||
|
|
||
| def wrap_execute_activity( | ||
| afunc: Callable[[ActivityInput], Awaitable[ActivityOutput]], | ||
| task_id: str = "", | ||
| **execute_activity_kwargs: Any, | ||
| ) -> Callable[..., Any]: | ||
| """Wrap an activity function to be called via workflow.execute_activity with caching.""" | ||
|
|
||
| async def wrapper(*args: Any, **kwargs: Any) -> Any: | ||
| # LangGraph may inject a RunnableConfig as the 'config' kwarg. Strip it | ||
| # down to a serializable subset (metadata + tags) so it can cross the | ||
| # activity boundary; callbacks, stores, etc. aren't serializable. | ||
| if "config" in kwargs: | ||
| orig = kwargs["config"] or {} | ||
| kwargs["config"] = { | ||
| "metadata": dict(orig.get("metadata") or {}), | ||
| "tags": list(orig.get("tags") or []), | ||
| } | ||
|
|
||
| # LangGraph may inject a Runtime as the 'runtime' kwarg. It's | ||
| # reconstructed on the activity side from the serialized langgraph | ||
| # config, so drop the live Runtime from the kwargs that cross the | ||
| # activity boundary (it holds non-serializable stream_writer, store). | ||
| runtime = kwargs.pop("runtime", None) | ||
| run_id = workflow.info().run_id | ||
| if ( | ||
| getattr(runtime, "store", None) is not None | ||
| and run_id not in _warned_store_runs | ||
| ): | ||
| _warned_store_runs.add(run_id) | ||
| workflow.logger.warning( | ||
| "LangGraph Store passed via compile(store=...) / @entrypoint(store=...) " | ||
| "is not accessible inside activity-wrapped nodes and tasks: the Store " | ||
| "object isn't serializable across the activity boundary, and activities " | ||
| "may run on a different worker than the workflow. Use a backend-backed " | ||
| "store (Postgres/Redis) configured on each worker if you need shared " | ||
| "memory, or use workflow state for per-run memory." | ||
| ) | ||
|
|
||
| langgraph_config = get_langgraph_config() | ||
|
|
||
| # Check task result cache (for continue-as-new deduplication). | ||
| key = ( | ||
| cache_key(task_id, args, kwargs, langgraph_config.get("context")) | ||
| if task_id | ||
| else "" | ||
| ) | ||
| if task_id: | ||
| found, cached = cache_lookup(key) | ||
| if found: | ||
| return cached | ||
|
|
||
| input = ActivityInput( | ||
| args=args, kwargs=kwargs, langgraph_config=langgraph_config | ||
| ) | ||
| output = await workflow.execute_activity( | ||
| afunc, input, **execute_activity_kwargs | ||
| ) | ||
| if output.langgraph_interrupts is not None: | ||
| raise GraphInterrupt(output.langgraph_interrupts) | ||
|
|
||
| result = output.result | ||
| if output.langgraph_command is not None: | ||
| cmd = output.langgraph_command | ||
| result = Command( | ||
| graph=cmd["graph"], | ||
| update=cmd["update"], | ||
| resume=cmd["resume"], | ||
| goto=cmd["goto"], | ||
| ) | ||
|
|
||
| # Store in cache for future continue-as-new cycles. | ||
| if task_id: | ||
| cache_put(key, result) | ||
|
|
||
| return result | ||
|
|
||
| return wrapper |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.