Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"]
pydantic = ["pydantic>=2.0.0,<3"]
openai-agents = ["openai-agents>=0.3,<0.7", "mcp>=1.9.4, <2"]
google-adk = ["google-adk>=1.27.0,<2"]
google-gemini = [
"google-genai>=1.66.0",
]

[project.urls]
Homepage = "https://github.com/temporalio/sdk-python"
Expand Down
63 changes: 63 additions & 0 deletions temporalio/contrib/google_gemini_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""First-class Temporal integration for the Google Gemini SDK.

.. warning::
This module is experimental and may change in future versions.
Use with caution in production environments.

This integration lets you use the Gemini SDK's async client with full
automatic function calling (AFC) support, where every API call and every
tool invocation is a **durable Temporal activity**.

No credentials are fetched in the workflow, and no auth material appears in
Temporal's event history.

- :class:`GeminiPlugin` — registers the ``gemini_api_call`` activity
and owns the real ``genai.Client`` on the worker side. Pass the same args
you would pass to ``genai.Client()``.
- :func:`gemini_client` — call from a workflow to get an ``AsyncClient``
that routes API calls through activities.
- :func:`activity_as_tool` — convert any ``@activity.defn`` function into a
Gemini tool callable; Gemini's AFC invokes it as a Temporal activity.

Quickstart::

# ---- worker setup (outside sandbox) ----
plugin = GeminiPlugin(api_key=os.environ["GOOGLE_API_KEY"])

@activity.defn
async def get_weather(state: str) -> str: ...

# ---- workflow (sandbox-safe) ----
@workflow.defn
class AgentWorkflow:
@workflow.run
async def run(self, query: str) -> str:
client = gemini_client()
response = await client.models.generate_content(
model="gemini-2.5-flash",
contents=query,
config=types.GenerateContentConfig(
tools=[
activity_as_tool(
get_weather,
start_to_close_timeout=timedelta(seconds=30),
),
],
),
)
return response.text
"""

from __future__ import annotations

from temporalio.contrib.google_gemini_sdk._gemini_plugin import GeminiPlugin
from temporalio.contrib.google_gemini_sdk.workflow import (
activity_as_tool,
gemini_client,
)

__all__ = [
"GeminiPlugin",
"activity_as_tool",
"gemini_client",
]
103 changes: 103 additions & 0 deletions temporalio/contrib/google_gemini_sdk/_gemini_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""Temporal activity that executes Gemini SDK API calls with real credentials.

The ``TemporalApiClient`` in the workflow dispatches calls here. This activity
creates (or reuses) a real ``genai.Client`` and forwards the structured request.
Credentials are fetched/refreshed only within the activity — they never appear
in workflow event history.
"""

from __future__ import annotations

from typing import Any, Optional, Union

import google.auth.credentials
from google.genai import Client as GeminiClient
from google.genai.client import DebugConfig
from google.genai.types import HttpOptions, HttpOptionsDict
from google.genai.types import HttpResponse as SdkHttpResponse

from temporalio import activity

from temporalio.contrib.google_gemini_sdk._temporal_api_client import (
_GeminiApiRequest,
_GeminiApiResponse,
)


class GeminiApiCaller:
"""Holds a cached ``genai.Client`` and exposes it as a Temporal activity.

The client is created lazily on first invocation and reused for all
subsequent calls, avoiding repeated SSL context creation, connection
pool setup, and credential loading.

Args match ``genai.Client()`` constructor parameters.
"""

def __init__(
self,
*,
vertexai: Optional[bool] = None,
api_key: Optional[str] = None,
credentials: Optional[google.auth.credentials.Credentials] = None,
project: Optional[str] = None,
location: Optional[str] = None,
debug_config: Optional[DebugConfig] = None,
http_options: Optional[Union[HttpOptions, HttpOptionsDict]] = None,
) -> None:
# Build kwargs for genai.Client, omitting None values so the SDK
# can apply its own defaults.
self._gemini_client_kwargs: dict[str, Any] = {}
if vertexai is not None:
self._gemini_client_kwargs["vertexai"] = vertexai
if api_key is not None:
self._gemini_client_kwargs["api_key"] = api_key
if credentials is not None:
self._gemini_client_kwargs["credentials"] = credentials
if project is not None:
self._gemini_client_kwargs["project"] = project
if location is not None:
self._gemini_client_kwargs["location"] = location
if debug_config is not None:
self._gemini_client_kwargs["debug_config"] = debug_config
if http_options is not None:
self._gemini_client_kwargs["http_options"] = http_options

self._client: GeminiClient | None = None

def _get_client(self) -> GeminiClient:
if self._client is None:
self._client = GeminiClient(**self._gemini_client_kwargs)
return self._client

def activity(self) -> Any:
"""Return a ``gemini_api_call`` activity that closes over this instance."""

@activity.defn(name="gemini_api_call")
async def gemini_api_call(req: _GeminiApiRequest) -> _GeminiApiResponse:
"""Execute a Gemini SDK API call with real credentials.

This activity is registered automatically by
:class:`~temporalio.contrib.google_gemini_sdk.GeminiPlugin`.
Do not call it directly.
"""
client = self._get_client()
per_request_options: HttpOptions | None = (
HttpOptions.model_validate(
req.http_options_overrides.model_dump(exclude_none=True)
)
if req.http_options_overrides
else None
)
response: SdkHttpResponse = await client.aio._api_client.async_request(
http_method=req.http_method,
path=req.path,
request_dict=req.request_dict,
http_options=per_request_options,
)
return _GeminiApiResponse(
headers=response.headers or {},
body=response.body or "",
)

return gemini_api_call
110 changes: 110 additions & 0 deletions temporalio/contrib/google_gemini_sdk/_gemini_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Temporal plugin for Google Gemini SDK integration."""

from __future__ import annotations

import dataclasses
from typing import Any, Optional, Union

import google.auth.credentials
from google.genai.client import DebugConfig
from google.genai.types import HttpOptions, HttpOptionsDict

from temporalio.contrib.google_gemini_sdk._gemini_activity import GeminiApiCaller
from temporalio.contrib.pydantic import PydanticPayloadConverter
from temporalio.converter import DataConverter, DefaultPayloadConverter
from temporalio.plugin import SimplePlugin
from temporalio.worker import WorkflowRunner
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner


def _data_converter(converter: DataConverter | None) -> DataConverter:
if converter is None:
return DataConverter(payload_converter_class=PydanticPayloadConverter)
elif converter.payload_converter_class is DefaultPayloadConverter:
return dataclasses.replace(
converter, payload_converter_class=PydanticPayloadConverter
)
return converter


class GeminiPlugin(SimplePlugin):
"""A Temporal Worker Plugin configured for the Google Gemini SDK.

.. warning::
This class is experimental and may change in future versions.
Use with caution in production environments.

This plugin registers the ``gemini_api_call`` activity, which owns the
real ``genai.Client`` with real credentials. Workflows use
:func:`~temporalio.contrib.google_gemini_sdk.workflow.gemini_client` to
get an ``AsyncClient`` backed by a ``TemporalApiClient`` that routes all
API calls through this activity.

No credentials are passed to or from the workflow. Auth material never
appears in Temporal's event history.

All ``genai.Client`` constructor arguments (``api_key``, ``vertexai``,
``project``, ``credentials``, etc.) are forwarded via ``**kwargs``.

Example (Gemini Developer API)::

plugin = GeminiPlugin(api_key=os.environ["GOOGLE_API_KEY"])

Example (Vertex AI)::

plugin = GeminiPlugin(
vertexai=True, project="my-project", location="us-central1",
)
"""

def __init__(
self,
*,
vertexai: Optional[bool] = None,
api_key: Optional[str] = None,
credentials: Optional[google.auth.credentials.Credentials] = None,
project: Optional[str] = None,
location: Optional[str] = None,
debug_config: Optional[DebugConfig] = None,
http_options: Optional[Union[HttpOptions, HttpOptionsDict]] = None,
) -> None:
"""Initialize the Gemini plugin.

Args:
vertexai: Whether to use Vertex AI API endpoints.
api_key: API key for the Gemini Developer API.
credentials: Google Cloud credentials for Vertex AI.
project: Google Cloud project ID for Vertex AI.
location: Google Cloud location for Vertex AI.
debug_config: Debug/testing configuration for the ``genai.Client``.
http_options: HTTP options (timeout, base_url, api_version, etc.)
forwarded to the ``genai.Client`` inside the activity.
"""
self._api_caller = GeminiApiCaller(
vertexai=vertexai,
api_key=api_key,
credentials=credentials,
project=project,
location=location,
debug_config=debug_config,
http_options=http_options,
)

def workflow_runner(runner: WorkflowRunner | None) -> WorkflowRunner:
if not runner:
raise ValueError("No WorkflowRunner provided to GeminiPlugin.")
if isinstance(runner, SandboxedWorkflowRunner):
return dataclasses.replace(
runner,
restrictions=runner.restrictions.with_passthrough_modules(
"google.genai"
),
)
return runner

super().__init__(
name="GeminiPlugin",
data_converter=_data_converter,
activities=[self._api_caller.activity()],
workflow_runner=workflow_runner,
)
Loading