Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [hello_standalone_activity](hello_standalone_activity) - Use activities without using a workflow.
* [langchain](langchain) - Orchestrate workflows for LangChain.
* [langgraph_plugin](langgraph_plugin) - Run LangGraph workflows as durable Temporal workflows (Graph API and Functional API).
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
* [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
* [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist.
Expand Down
67 changes: 67 additions & 0 deletions langgraph_plugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# LangGraph Plugin Samples

These samples demonstrate the [Temporal LangGraph plugin](https://github.com/temporalio/sdk-python/pull/1448), which runs LangGraph workflows as durable Temporal workflows. Each LangGraph graph node (Graph API) or `@task` (Functional API) executes as a Temporal activity with automatic retries, timeouts, and crash recovery.

Samples are organized by API style:

- **Graph API** (`graph_api/`) -- Define workflows as `StateGraph` with nodes and edges.
- **Functional API** (`functional_api/`) -- Define workflows with `@task` and `@entrypoint` decorators for an imperative programming style.

## Samples

| Sample | Graph API | Functional API | Description |
|--------|:---------:|:--------------:|-------------|
| **Hello World** | [graph_api/hello_world](graph_api/hello_world) | [functional_api/hello_world](functional_api/hello_world) | Minimal sample -- single node/task that processes a query string. Start here. |
| **Human-in-the-loop** | [graph_api/human_in_the_loop](graph_api/human_in_the_loop) | [functional_api/human_in_the_loop](functional_api/human_in_the_loop) | Chatbot that uses `interrupt()` to pause for human approval, Temporal signals to receive feedback, and queries to expose the pending draft. |
| **Continue-as-new** | [graph_api/continue_as_new](graph_api/continue_as_new) | [functional_api/continue_as_new](functional_api/continue_as_new) | Multi-stage data pipeline that uses `continue-as-new` with task result caching so previously-completed stages are not re-executed. |
| **ReAct Agent** | [graph_api/react_agent](graph_api/react_agent) | [functional_api/react_agent](functional_api/react_agent) | Tool-calling agent loop. Graph API uses conditional edges; Functional API uses a `while` loop. |
| **Control Flow** | -- | [functional_api/control_flow](functional_api/control_flow) | Demonstrates parallel task execution, `for` loops, and `if/else` branching -- patterns that are natural in the Functional API. |
Comment thread
DABH marked this conversation as resolved.
| **LangSmith Tracing** | [graph_api/langsmith_tracing](graph_api/langsmith_tracing) | [functional_api/langsmith_tracing](functional_api/langsmith_tracing) | Combines `LangGraphPlugin` with Temporal's `LangSmithPlugin` for durable execution + full observability of LLM calls. Requires API keys. |

## Prerequisites

1. Install dependencies:

```bash
uv sync --group langgraph
```

Comment thread
DABH marked this conversation as resolved.
2. Start a [Temporal dev server](https://docs.temporal.io/cli#start-dev-server):

```bash
temporal server start-dev
```

## Running a Sample

Each sample has two scripts -- start the worker first, then the workflow starter in a separate terminal.

```bash
# Terminal 1: start the worker
uv run langgraph_plugin/<api>/<sample>/run_worker.py

# Terminal 2: start the workflow
uv run langgraph_plugin/<api>/<sample>/run_workflow.py
```

For example, to run the Graph API human-in-the-loop chatbot:

```bash
# Terminal 1
uv run langgraph_plugin/graph_api/human_in_the_loop/run_worker.py

# Terminal 2
uv run langgraph_plugin/graph_api/human_in_the_loop/run_workflow.py
```

## Key Features Demonstrated

- **Durable execution** -- Every graph node / `@task` runs as a Temporal activity with configurable timeouts and retry policies.
- **Human-in-the-loop** -- LangGraph's `interrupt()` pauses the graph; Temporal signals deliver human input; queries expose pending state to UIs.
- **Continue-as-new with caching** -- `get_cache()` captures completed task results; passing the cache to the next execution avoids re-running them.
- **Conditional routing** -- Graph API's `add_conditional_edges` and Functional API's native `if/else`/`while` for agent loops.
- **Parallel execution** -- Functional API launches multiple tasks concurrently by creating futures before awaiting them.

## Related

- [SDK PR: LangGraph plugin](https://github.com/temporalio/sdk-python/pull/1448)
1 change: 1 addition & 0 deletions langgraph_plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Temporal LangGraph plugin samples."""
1 change: 1 addition & 0 deletions langgraph_plugin/functional_api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""LangGraph Functional API samples using @task and @entrypoint."""
36 changes: 36 additions & 0 deletions langgraph_plugin/functional_api/continue_as_new/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Continue-as-New with Caching (Functional API)

Same pattern as the Graph API version, using `@task` and `@entrypoint` decorators.

## What This Sample Demonstrates

- Task result caching across continue-as-new boundaries with `get_cache()`
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to cache() to match graph() and entrypoint()

- Restoring cached results with `entrypoint(name, cache=...)`
- Each `@task` executes exactly once despite multiple workflow invocations

## How It Works

1. Three tasks run sequentially: `extract` (x2) -> `transform` (+50) -> `load` (x3).
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit weird. My first thought is why is a x2 operation called "extract"? Maybe we want to rename it or actually do an ETL-like pipeline

2. After the first invocation, the workflow continues-as-new with the cache.
3. On subsequent invocations, all tasks return cached results instantly.
4. Input 10 -> 20 -> 70 -> 210.

## Running the Sample

Prerequisites: `uv sync --group langgraph` and a running Temporal dev server.

```bash
# Terminal 1
uv run langgraph_plugin/functional_api/continue_as_new/run_worker.py

# Terminal 2
uv run langgraph_plugin/functional_api/continue_as_new/run_workflow.py
```

## Files

| File | Description |
|------|-------------|
| `workflow.py` | `@task` functions, `@entrypoint`, `PipelineInput`, and `PipelineFunctionalWorkflow` |
| `run_worker.py` | Registers tasks and entrypoint with `LangGraphPlugin`, starts worker |
| `run_workflow.py` | Executes the pipeline workflow and prints the result |
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Continue-as-new pipeline with task result caching."""
35 changes: 35 additions & 0 deletions langgraph_plugin/functional_api/continue_as_new/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Worker for the continue-as-new pipeline (Functional API)."""

import asyncio
import os

from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.worker import Worker

from langgraph_plugin.functional_api.continue_as_new.workflow import (
PipelineFunctionalWorkflow,
activity_options,
all_tasks,
)


async def main() -> None:
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
plugin = LangGraphPlugin(
tasks=all_tasks,
activity_options=activity_options,
)

worker = Worker(
client,
task_queue="langgraph-pipeline-functional",
workflows=[PipelineFunctionalWorkflow],
plugins=[plugin],
)
print("Worker started. Ctrl+C to exit.")
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
31 changes: 31 additions & 0 deletions langgraph_plugin/functional_api/continue_as_new/run_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Start the continue-as-new pipeline workflow (Functional API)."""

import asyncio
import os
from datetime import timedelta

from temporalio.client import Client

from langgraph_plugin.functional_api.continue_as_new.workflow import (
PipelineFunctionalWorkflow,
PipelineInput,
)


async def main() -> None:
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))

result = await client.execute_workflow(
PipelineFunctionalWorkflow.run,
PipelineInput(data=10),
id="pipeline-functional-workflow",
task_queue="langgraph-pipeline-functional",
execution_timeout=timedelta(seconds=60),
)

# 10*2=20 -> 20+50=70 -> 70*3=210
print(f"Pipeline result: {result}")


if __name__ == "__main__":
asyncio.run(main())
80 changes: 80 additions & 0 deletions langgraph_plugin/functional_api/continue_as_new/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Continue-as-new with caching using the LangGraph Functional API with Temporal.

Same pattern as the Graph API version, but using @task and @entrypoint decorators.
"""

from dataclasses import dataclass
from datetime import timedelta
from typing import Any

from langgraph.func import entrypoint as lg_entrypoint
from langgraph.func import task
from temporalio import workflow
from temporalio.contrib.langgraph import get_cache, set_cache


@task
def extract(data: int) -> int:
"""Stage 1: Extract -- simulate data extraction by doubling the input."""
return data * 2


@task
def transform(data: int) -> int:
"""Stage 2: Transform -- simulate transformation by adding 50."""
return data + 50


@task
def load(data: int) -> int:
"""Stage 3: Load -- simulate loading by tripling the result."""
return data * 3


@lg_entrypoint()
async def pipeline_entrypoint(data: int) -> dict:
"""Run the 3-stage pipeline: extract -> transform -> load."""
extracted = await extract(data)
transformed = await transform(extracted)
loaded = await load(transformed)
return {"result": loaded}


all_tasks = [extract, transform, load]

activity_options = {
t.func.__name__: {"start_to_close_timeout": timedelta(seconds=30)}
for t in all_tasks
}


@dataclass
class PipelineInput:
data: int
cache: dict[str, Any] | None = None
phase: int = 1


@workflow.defn
class PipelineFunctionalWorkflow:
"""Runs the pipeline, continuing-as-new after each phase.

Input 10: 10*2=20 -> 20+50=70 -> 70*3=210
Each task executes once; phases 2 and 3 use cached results.
"""

@workflow.run
async def run(self, input_data: PipelineInput) -> dict[str, Any]:
set_cache(input_data.cache)
result = await pipeline_entrypoint.ainvoke(input_data.data)

if input_data.phase < 3:
workflow.continue_as_new(
PipelineInput(
data=input_data.data,
cache=get_cache(),
phase=input_data.phase + 1,
)
)

return result
Comment thread
DABH marked this conversation as resolved.
37 changes: 37 additions & 0 deletions langgraph_plugin/functional_api/control_flow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Control Flow (Functional API)

Demonstrates the Functional API's strength for complex control flow: parallel execution, sequential loops, and conditional branching — all as natural Python code.

## What This Sample Demonstrates

- **Parallel execution**: launching multiple tasks concurrently by creating futures before awaiting
- **For loops**: processing items sequentially with `for item in items`
- **If/else branching**: routing items based on classification results
- Why the Functional API is ideal for programmatic composition patterns

## How It Works

1. A batch of items is validated **in parallel** — all `validate_item` tasks launch concurrently.
2. Valid items are processed **sequentially** in a for loop.
3. Each item is classified, then routed via **if/else** to `process_urgent` or `process_normal`.
4. Results are aggregated with a `summarize` task.

## Running the Sample

Prerequisites: `uv sync --group langgraph` and a running Temporal dev server.

```bash
# Terminal 1
uv run langgraph_plugin/functional_api/control_flow/run_worker.py

# Terminal 2
uv run langgraph_plugin/functional_api/control_flow/run_workflow.py
```

## Files

| File | Description |
|------|-------------|
| `workflow.py` | `@task` functions (validate, classify, process, summarize), `@entrypoint`, and `ControlFlowWorkflow` |
| `run_worker.py` | Registers tasks and entrypoint with `LangGraphPlugin`, starts worker |
| `run_workflow.py` | Sends a batch of items and prints processing results |
1 change: 1 addition & 0 deletions langgraph_plugin/functional_api/control_flow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Control flow: parallel execution, for loops, and if/else branching."""
35 changes: 35 additions & 0 deletions langgraph_plugin/functional_api/control_flow/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Worker for the control flow pipeline (Functional API)."""

import asyncio
import os

from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.worker import Worker

from langgraph_plugin.functional_api.control_flow.workflow import (
ControlFlowWorkflow,
activity_options,
all_tasks,
)


async def main() -> None:
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
plugin = LangGraphPlugin(
tasks=all_tasks,
activity_options=activity_options,
)

worker = Worker(
client,
task_queue="langgraph-control-flow",
workflows=[ControlFlowWorkflow],
plugins=[plugin],
)
print("Worker started. Ctrl+C to exit.")
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
38 changes: 38 additions & 0 deletions langgraph_plugin/functional_api/control_flow/run_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Start the control flow pipeline workflow (Functional API)."""

import asyncio
import os

from temporalio.client import Client

from langgraph_plugin.functional_api.control_flow.workflow import (
ControlFlowWorkflow,
)


async def main() -> None:
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))

items = [
"Fix login bug",
"URGENT: Production outage in payments",
"Update README",
"INVALID:",
"Urgent: Security patch needed",
"Refactor test suite",
]

result = await client.execute_workflow(
ControlFlowWorkflow.run,
items,
id="control-flow-workflow",
task_queue="langgraph-control-flow",
)

print(f"Summary: {result['summary']}")
for r in result["results"]:
print(f" {r}")


if __name__ == "__main__":
asyncio.run(main())
Loading