diff --git a/api/oss/src/apis/fastapi/tools/models.py b/api/oss/src/apis/fastapi/tools/models.py index 891b276c22..86c62dcec6 100644 --- a/api/oss/src/apis/fastapi/tools/models.py +++ b/api/oss/src/apis/fastapi/tools/models.py @@ -1,6 +1,12 @@ -from typing import List, Optional, Union +from typing import Any, List, Optional, Union -from pydantic import BaseModel +from agenta.sdk.agents.tools import ( + BuiltinToolConfig, + GatewayToolConfig, + ToolConfigurationError, + coerce_tool_configs, +) +from pydantic import BaseModel, Field, field_validator from oss.src.core.tools.dtos import ( # Tool Catalog @@ -15,6 +21,9 @@ ToolConnectionCreate, # Tool Calls ToolResult, + # Agent tools + AgentToolReference, + ResolvedAgentTool, ) @@ -87,3 +96,34 @@ class ToolConnectionsResponse(BaseModel): class ToolCallResponse(BaseModel): call: ToolResult + + +# --------------------------------------------------------------------------- +# Agent tool resolution +# --------------------------------------------------------------------------- + + +class ToolResolveRequest(BaseModel): + tools: List[AgentToolReference] = Field(default_factory=list) + + @field_validator("tools", mode="before") + @classmethod + def _coerce_tools(cls, value: Any) -> List[AgentToolReference]: + try: + configs = coerce_tool_configs(value or []).tool_configs + except ToolConfigurationError as exc: + raise ValueError(str(exc)) from exc + unsupported = [ + config + for config in configs + if not isinstance(config, (BuiltinToolConfig, GatewayToolConfig)) + ] + if unsupported: + raise ValueError("/tools/resolve accepts only builtin and gateway tools") + return configs + + +class ToolResolveResponse(BaseModel): + count: int = 0 + builtins: List[str] = Field(default_factory=list) + custom: List[ResolvedAgentTool] = Field(default_factory=list) diff --git a/api/oss/src/apis/fastapi/tools/router.py b/api/oss/src/apis/fastapi/tools/router.py index 043d114fa7..3cc689a055 100644 --- a/api/oss/src/apis/fastapi/tools/router.py +++ b/api/oss/src/apis/fastapi/tools/router.py @@ -29,6 +29,9 @@ ToolConnectionsResponse, # ToolCallResponse, + # + ToolResolveRequest, + ToolResolveResponse, ) from oss.src.core.shared.dtos import Status @@ -42,10 +45,12 @@ ToolResultData, ) from oss.src.core.tools.exceptions import ( + ActionNotFoundError, AdapterError, ConnectionInactiveError, ConnectionInvalidError, ConnectionNotFoundError, + ToolSlugInvalidError, ) from oss.src.core.tools.service import ( ToolsService, @@ -208,6 +213,14 @@ def __init__( ) # --- Tool operations --- + self.router.add_api_route( + "/resolve", + self.resolve_tools, + methods=["POST"], + operation_id="resolve_agent_tools", + response_model=ToolResolveResponse, + response_model_exclude_none=True, + ) self.router.add_api_route( "/call", self.call_tool, @@ -886,6 +899,51 @@ async def callback_connection( # Tool Calls # ----------------------------------------------------------------------- + @intercept_exceptions() + @handle_adapter_exceptions() + async def resolve_tools( + self, + request: Request, + *, + body: ToolResolveRequest, + ) -> ToolResolveResponse: + """Resolve an agent's tool references into model-ready specs. + + Validates Composio connections up front and enriches each action from the + catalog, so a running agent (e.g. Pi) gets ``customTools`` whose ``execute`` + routes back through ``POST /tools/call`` — provider keys stay server-side. + """ + if is_ee(): + has_permission = await check_action_access( + user_uid=request.state.user_id, + project_id=request.state.project_id, + permission=Permission.VIEW_TOOLS, + ) + if not has_permission: + raise FORBIDDEN_EXCEPTION + + try: + resolution = await self.tools_service.resolve_agent_tools( + project_id=UUID(request.state.project_id), + tools=body.tools, + ) + except ConnectionNotFoundError as e: + raise HTTPException(status_code=404, detail=e.message) from e + except ConnectionInactiveError as e: + raise HTTPException(status_code=400, detail=e.message) from e + except ConnectionInvalidError as e: + raise HTTPException(status_code=400, detail=e.message) from e + except ToolSlugInvalidError as e: + raise HTTPException(status_code=400, detail=e.message) from e + except ActionNotFoundError as e: + raise HTTPException(status_code=404, detail=e.message) from e + + return ToolResolveResponse( + count=len(resolution.builtins) + len(resolution.custom), + builtins=resolution.builtins, + custom=resolution.custom, + ) + @intercept_exceptions() @handle_adapter_exceptions() async def call_tool( @@ -931,39 +989,12 @@ async def call_tool( connection_slug = slug_parts[4] try: - connections = await self.tools_service.query_connections( + connection = await self.tools_service.resolve_connection_by_slug( project_id=UUID(request.state.project_id), provider_key=provider_key, integration_key=integration_key, + connection_slug=connection_slug, ) - - connection = next( - (c for c in connections if c.slug == connection_slug), None - ) - - if not connection: - raise ConnectionNotFoundError( - connection_slug=connection_slug, - provider_key=provider_key, - integration_key=integration_key, - ) - - if not connection.is_active: - raise ConnectionInactiveError(connection_id=connection_slug) - - if not connection.is_valid: - raise ConnectionInvalidError( - connection_slug=connection_slug, - detail="Please refresh the connection.", - ) - - if not connection.provider_connection_id: - raise ConnectionNotFoundError( - connection_slug=connection_slug, - provider_key=provider_key, - integration_key=integration_key, - ) - except ConnectionNotFoundError as e: raise HTTPException(status_code=404, detail=e.message) from e except ConnectionInactiveError as e: diff --git a/api/oss/src/core/tools/dtos.py b/api/oss/src/core/tools/dtos.py index a588965f61..224a956227 100644 --- a/api/oss/src/core/tools/dtos.py +++ b/api/oss/src/core/tools/dtos.py @@ -1,8 +1,9 @@ from enum import Enum -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union +from agenta.sdk.agents.tools import BuiltinToolConfig, GatewayToolConfig from agenta.sdk.models.workflows import JsonSchemas -from pydantic import BaseModel +from pydantic import BaseModel, Field from oss.src.core.shared.dtos import ( Header, @@ -124,6 +125,13 @@ def provider_connection_id(self) -> Optional[str]: ) return None + @property + def is_no_auth(self) -> bool: + """True for a no-auth toolkit connection (no Composio auth config/account).""" + return bool( + self.data and isinstance(self.data, dict) and self.data.get("no_auth") + ) + @property def is_active(self) -> bool: """Check if connection is active (not deleted).""" @@ -227,7 +235,7 @@ class ToolExecutionRequest(BaseModel): integration_key: str action_key: str - provider_connection_id: str + provider_connection_id: Optional[str] = None # absent for no-auth toolkits user_id: Optional[str] = None arguments: Dict[str, Any] = {} @@ -238,3 +246,42 @@ class ToolExecutionResponse(BaseModel): data: Optional[Json] = None error: Optional[str] = None successful: bool = False + + +# --------------------------------------------------------------------------- +# Agent tools (config references + resolution) +# --------------------------------------------------------------------------- + +# A provider-agnostic list of tool references lives under an agent revision's +# ``parameters["tools"]``. Each entry is a discriminated union on ``type``: config +# holds references and display metadata only, never secrets. The backend resolves +# them into model-ready specs at invoke time (see ToolsService.resolve_agent_tools). + + +AgentBuiltinTool = BuiltinToolConfig +AgentComposioTool = GatewayToolConfig +AgentToolReference = Union[BuiltinToolConfig, GatewayToolConfig] + + +class ResolvedAgentTool(BaseModel): + """A runnable reference resolved into a model-ready tool spec. + + ``call_ref`` is the ``tools.{provider}.{integration}.{action}.{connection}`` slug + the execution bridge sends back to ``POST /tools/call``. + """ + + name: str + description: Optional[str] = None + input_schema: Optional[Dict[str, Any]] = None + call_ref: str + + +class AgentToolsResolution(BaseModel): + """Outcome of resolving an agent's ``tools`` list. + + ``builtins`` pass straight into Pi's ``tools: string[]``; ``custom`` become Pi + ``customTools`` whose ``execute`` routes through ``/tools/call``. + """ + + builtins: List[str] = Field(default_factory=list) + custom: List[ResolvedAgentTool] = Field(default_factory=list) diff --git a/api/oss/src/core/tools/exceptions.py b/api/oss/src/core/tools/exceptions.py index f46c08b6cd..e9dbd54f3f 100644 --- a/api/oss/src/core/tools/exceptions.py +++ b/api/oss/src/core/tools/exceptions.py @@ -40,6 +40,24 @@ def __init__( super().__init__(msg) +class ActionNotFoundError(ToolsError): + """Raised when a catalog action cannot be found for an integration.""" + + def __init__( + self, + *, + provider_key: str, + integration_key: str, + action_key: str, + ): + self.provider_key = provider_key + self.integration_key = integration_key + self.action_key = action_key + super().__init__( + f"Action not found: {provider_key}/{integration_key}/{action_key}" + ) + + class ConnectionSlugConflictError(ToolsError): """Raised when a connection slug already exists for the integration.""" diff --git a/api/oss/src/core/tools/providers/composio/adapter.py b/api/oss/src/core/tools/providers/composio/adapter.py index f90ab9aa8e..80fcdee100 100644 --- a/api/oss/src/core/tools/providers/composio/adapter.py +++ b/api/oss/src/core/tools/providers/composio/adapter.py @@ -24,6 +24,18 @@ COMPOSIO_DEFAULT_API_URL = "https://backend.composio.dev/api/v3" +def _is_no_auth_toolkit(toolkit: Dict[str, Any]) -> bool: + """A toolkit needs no auth when every auth_config_details entry is NO_AUTH. + + Composio's GET /toolkits/{slug} reports a single ``{"mode": "NO_AUTH"}`` entry + for toolkits like ``codeinterpreter`` and the ``composio`` meta-toolkit. + """ + details = toolkit.get("auth_config_details") or [] + if not details: + return False + return all((detail.get("mode") or "").upper() == "NO_AUTH" for detail in details) + + class ComposioToolsAdapter(ComposioCatalogClient, ToolsGatewayInterface): """Composio V3 API adapter — uses httpx directly (no SDK). @@ -199,15 +211,25 @@ async def initiate_connection( detail=str(e), ) from e - # Step 2: create an auth config for this integration. - # api_key → use_custom_auth; Composio's redirect UI collects the credentials. - # oauth / None → use_composio_managed_auth. log.info( "initiate_connection: integration_key=%s auth_scheme=%r", integration_key, auth_scheme, ) + # No-auth toolkits (e.g. codeinterpreter) reject auth-config creation with a + # 400. They need neither an auth config nor a connected account — their tools + # execute directly. Return a marked connection the service persists as valid. + if _is_no_auth_toolkit(toolkit): + return ToolConnectionResponse( + provider_connection_id="", + redirect_url=None, + connection_data={"no_auth": True}, + ) + + # Step 2: create an auth config for this integration. + # api_key → use_custom_auth; Composio's redirect UI collects the credentials. + # oauth / None → use_composio_managed_auth. if auth_scheme == "api_key": # Derive Composio authScheme from toolkit's auth_config_details. # Fall back to "API_KEY" as the common default. @@ -388,10 +410,10 @@ async def execute( action_key=request.action_key, ) - payload: Dict[str, Any] = { - "arguments": request.arguments, - "connected_account_id": request.provider_connection_id, - } + payload: Dict[str, Any] = {"arguments": request.arguments} + # No-auth toolkits run without a connected account; only send the id when set. + if request.provider_connection_id: + payload["connected_account_id"] = request.provider_connection_id if request.user_id: payload["user_id"] = request.user_id diff --git a/api/oss/src/core/tools/service.py b/api/oss/src/core/tools/service.py index f603bc4d42..03c8244f5d 100644 --- a/api/oss/src/core/tools/service.py +++ b/api/oss/src/core/tools/service.py @@ -1,3 +1,4 @@ +import re from typing import Any, Dict, List, Optional, Tuple from uuid import UUID @@ -6,6 +7,11 @@ from oss.src.core.tools.utils import make_oauth_state from oss.src.core.tools.dtos import ( + AgentBuiltinTool, + AgentComposioTool, + AgentToolReference, + AgentToolsResolution, + ResolvedAgentTool, ToolCatalogAction, ToolCatalogActionDetails, ToolCatalogIntegration, @@ -15,17 +21,27 @@ ToolConnectionRequest, ToolExecutionRequest, ToolExecutionResponse, + ToolProviderKind, ) from oss.src.core.tools.interfaces import ( ToolsDAOInterface, ) from oss.src.core.tools.registry import ToolsGatewayRegistry from oss.src.core.tools.exceptions import ( + ActionNotFoundError, ConnectionInactiveError, + ConnectionInvalidError, ConnectionNotFoundError, + ToolSlugInvalidError, ) +# A slug segment is safe for the ``tools.{provider}.{integration}.{action}.{connection}`` +# call-ref. ``__`` is forbidden because ``/tools/call`` round-trips ``__`` <-> ``.`` when +# parsing function names, so a ``__`` inside a segment would corrupt the split. +_SLUG_SEGMENT_RE = re.compile(r"^[a-zA-Z0-9-]+(?:_[a-zA-Z0-9-]+)*$") + + log = get_module_logger(__name__) @@ -243,6 +259,16 @@ async def create_connection( data["project_id"] = str(project_id) connection_create.data = data # type: ignore[assignment] + # Connection validity is server-owned, never client-supplied. An auth-backed + # connection is not valid until its flow completes (the OAuth callback flips + # is_valid). A no-auth toolkit has no flow, so the server marks it valid up front, + # but only after the adapter confirmed it is no-auth. Drop any client-sent flags so a + # caller cannot mark a pending OAuth connection valid. + connection_create.flags = { # type: ignore[assignment] + "is_active": True, + "is_valid": bool(data.get("no_auth")), + } + # Persist locally return await self.tools_dao.create_connection( project_id=project_id, @@ -331,6 +357,11 @@ async def refresh_connection( connection_id=str(connection_id), ) + # A no-auth connection has no provider-side authorization to re-link, so refresh is a + # no-op. Return it unchanged rather than reporting it missing. + if conn.is_no_auth: + return conn + if not conn.provider_connection_id: raise ConnectionNotFoundError( connection_id=str(connection_id), @@ -392,7 +423,7 @@ async def execute_tool( provider_key: str, integration_key: str, action_key: str, - provider_connection_id: str, + provider_connection_id: Optional[str] = None, user_id: Optional[str] = None, arguments: Dict[str, Any], ) -> ToolExecutionResponse: @@ -408,3 +439,148 @@ async def execute_tool( arguments=arguments, ), ) + + # ----------------------------------------------------------------------- + # Connection resolution (shared by the call endpoint and the agent resolver) + # ----------------------------------------------------------------------- + + async def resolve_connection_by_slug( + self, + *, + project_id: UUID, + provider_key: str, + integration_key: str, + connection_slug: str, + ) -> ToolConnection: + """Resolve a project-scoped connection slug to a usable connection row. + + Raises a domain exception when the connection is missing, inactive, invalid, + or never finished its provider handshake. Shared by ``call_tool`` (execution) + and ``resolve_agent_tools`` (up-front validation). + """ + # Query all (not active-only) so an inactive connection yields a precise + # "inactive" error instead of an indistinguishable "not found". + connections = await self.query_connections( + project_id=project_id, + provider_key=provider_key, + integration_key=integration_key, + is_active=None, + ) + + connection = next( + (c for c in connections if c.slug == connection_slug), + None, + ) + + if not connection: + raise ConnectionNotFoundError( + provider_key=provider_key, + integration_key=integration_key, + connection_slug=connection_slug, + ) + + if not connection.is_active: + raise ConnectionInactiveError(connection_id=connection_slug) + + if not connection.is_valid: + raise ConnectionInvalidError( + connection_slug=connection_slug, + detail="Please refresh the connection.", + ) + + # No-auth toolkits have no provider-side connected account; the missing id is + # expected and execution runs without one. + if not connection.is_no_auth and not connection.provider_connection_id: + raise ConnectionNotFoundError( + provider_key=provider_key, + integration_key=integration_key, + connection_slug=connection_slug, + ) + + return connection + + # ----------------------------------------------------------------------- + # Agent tool resolution + # ----------------------------------------------------------------------- + + async def resolve_agent_tools( + self, + *, + project_id: UUID, + tools: List[AgentToolReference], + ) -> AgentToolsResolution: + """Resolve an agent's tool references into model-ready specs. + + ``builtin`` references pass through as names. ``composio`` references are + validated against the project's connections up front and enriched from the + catalog (description + input schema), so the model never sees a stale schema + and the invoke fails fast on a missing/invalid connection rather than mid-loop. + """ + builtins: List[str] = [] + custom: List[ResolvedAgentTool] = [] + + for ref in tools: + if isinstance(ref, AgentBuiltinTool): + if ref.name: + builtins.append(ref.name) + continue + + if isinstance(ref, AgentComposioTool): + custom.append( + await self._resolve_composio_tool( + project_id=project_id, + ref=ref, + ) + ) + + return AgentToolsResolution(builtins=builtins, custom=custom) + + async def _resolve_composio_tool( + self, + *, + project_id: UUID, + ref: AgentComposioTool, + ) -> ResolvedAgentTool: + provider_key = ToolProviderKind.COMPOSIO.value + + for segment in (ref.integration, ref.action, ref.connection): + if not _SLUG_SEGMENT_RE.match(segment): + raise ToolSlugInvalidError( + slug=f"{provider_key}.{ref.integration}.{ref.action}.{ref.connection}", + detail=f"Invalid slug segment: {segment!r}", + ) + + # Fail fast if the connection is missing/inactive/invalid for this project. + await self.resolve_connection_by_slug( + project_id=project_id, + provider_key=provider_key, + integration_key=ref.integration, + connection_slug=ref.connection, + ) + + action = await self.get_action( + provider_key=provider_key, + integration_key=ref.integration, + action_key=ref.action, + ) + if not action: + raise ActionNotFoundError( + provider_key=provider_key, + integration_key=ref.integration, + action_key=ref.action, + ) + + input_schema = ( + action.schemas.inputs if action.schemas and action.schemas.inputs else None + ) + name = ref.name or f"{ref.integration}__{ref.action}" + call_ref = ( + f"tools.{provider_key}.{ref.integration}.{ref.action}.{ref.connection}" + ) + + return ResolvedAgentTool( + name=name, + description=action.description, + input_schema=input_schema, + call_ref=call_ref, + ) diff --git a/api/oss/tests/pytest/unit/tools/__init__.py b/api/oss/tests/pytest/unit/tools/__init__.py new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/api/oss/tests/pytest/unit/tools/__init__.py @@ -0,0 +1 @@ + diff --git a/api/oss/tests/pytest/unit/tools/test_agent_resolution.py b/api/oss/tests/pytest/unit/tools/test_agent_resolution.py new file mode 100644 index 0000000000..12ad49266a --- /dev/null +++ b/api/oss/tests/pytest/unit/tools/test_agent_resolution.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from types import SimpleNamespace +from uuid import uuid4 + +import pytest +from pydantic import ValidationError + +from agenta.sdk.agents.tools import BuiltinToolConfig, GatewayToolConfig + +from oss.src.apis.fastapi.tools.models import ToolResolveRequest +from oss.src.core.tools.dtos import AgentBuiltinTool, AgentComposioTool +from oss.src.core.tools.service import ToolsService + + +def test_api_reuses_sdk_tool_config_classes(): + assert AgentBuiltinTool is BuiltinToolConfig + assert AgentComposioTool is GatewayToolConfig + + +def test_resolve_request_coerces_legacy_composio_shape(): + request = ToolResolveRequest( + tools=[ + "read", + { + "type": "composio", + "integration": "github", + "action": "GET_USER", + "connection": "c1", + }, + ] + ) + assert isinstance(request.tools[0], BuiltinToolConfig) + assert isinstance(request.tools[1], GatewayToolConfig) + + +def test_resolve_request_rejects_non_gateway_runtime_tools(): + with pytest.raises(ValidationError, match="only builtin and gateway"): + ToolResolveRequest( + tools=[ + { + "type": "code", + "name": "calc", + "script": "...", + } + ] + ) + + +async def test_api_resolution_returns_stable_call_reference(monkeypatch): + service = object.__new__(ToolsService) + + async def _connection(**_kwargs): + return object() + + async def _action(**_kwargs): + return SimpleNamespace( + description="Get user", + schemas=SimpleNamespace( + inputs={"type": "object", "properties": {}}, + ), + ) + + monkeypatch.setattr(service, "resolve_connection_by_slug", _connection) + monkeypatch.setattr(service, "get_action", _action) + + result = await service.resolve_agent_tools( + project_id=uuid4(), + tools=[ + BuiltinToolConfig(name="read"), + GatewayToolConfig( + integration="github", + action="GET_USER", + connection="c1", + ), + ], + ) + assert result.builtins == ["read"] + assert result.custom[0].call_ref == "tools.composio.github.GET_USER.c1" diff --git a/api/oss/tests/pytest/unit/tools/test_no_auth_connection.py b/api/oss/tests/pytest/unit/tools/test_no_auth_connection.py new file mode 100644 index 0000000000..da6f1e3052 --- /dev/null +++ b/api/oss/tests/pytest/unit/tools/test_no_auth_connection.py @@ -0,0 +1,280 @@ +from __future__ import annotations + +from uuid import uuid4 + +import pytest + +from oss.src.core.tools import service as service_mod +from oss.src.core.tools.dtos import ( + ToolConnection, + ToolConnectionCreate, + ToolConnectionRequest, + ToolConnectionResponse, + ToolExecutionRequest, + ToolProviderKind, +) +from oss.src.core.tools.exceptions import ConnectionNotFoundError +from oss.src.core.tools.providers.composio.adapter import ( + ComposioToolsAdapter, + _is_no_auth_toolkit, +) +from oss.src.core.tools.service import ToolsService + + +_NO_AUTH_TOOLKIT = { + "slug": "codeinterpreter", + "auth_config_details": [{"name": "CodeInterpreter", "mode": "NO_AUTH"}], +} +_OAUTH_TOOLKIT = { + "slug": "github", + "auth_config_details": [{"name": "GitHub", "mode": "OAUTH2"}], +} + + +def test_is_no_auth_toolkit_detects_no_auth(): + assert _is_no_auth_toolkit(_NO_AUTH_TOOLKIT) is True + assert _is_no_auth_toolkit(_OAUTH_TOOLKIT) is False + assert _is_no_auth_toolkit({"auth_config_details": []}) is False + assert _is_no_auth_toolkit({}) is False + + +async def test_initiate_connection_skips_auth_config_for_no_auth(monkeypatch): + """No-auth toolkit must not POST /auth_configs (Composio rejects that with 400).""" + adapter = object.__new__(ComposioToolsAdapter) + + posted: list[str] = [] + + async def _get(path, *, params=None): + assert path == "/toolkits/codeinterpreter" + return _NO_AUTH_TOOLKIT + + async def _post(path, *, json=None): + posted.append(path) + return {} + + monkeypatch.setattr(adapter, "_get", _get) + monkeypatch.setattr(adapter, "_post", _post) + + result = await adapter.initiate_connection( + request=ToolConnectionRequest( + user_id="proj", + integration_key="codeinterpreter", + ), + ) + + assert posted == [] # no /auth_configs, no /connected_accounts/link + assert result.provider_connection_id == "" + assert result.redirect_url is None + assert result.connection_data == {"no_auth": True} + + +async def test_execute_omits_connected_account_for_no_auth(monkeypatch): + """A blank provider_connection_id must not be sent as connected_account_id.""" + adapter = object.__new__(ComposioToolsAdapter) + + sent: dict = {} + + async def _post(path, *, json=None): + sent["path"] = path + sent["json"] = json + return {"data": {"stdout": "42\n"}, "successful": True, "error": None} + + monkeypatch.setattr(adapter, "_post", _post) + + await adapter.execute( + request=ToolExecutionRequest( + integration_key="codeinterpreter", + action_key="EXECUTE_CODE", + provider_connection_id=None, + arguments={"code_to_execute": "print(6*7)"}, + ), + ) + + assert sent["path"] == "/tools/execute/CODEINTERPRETER_EXECUTE_CODE" + assert "connected_account_id" not in sent["json"] + assert sent["json"]["arguments"] == {"code_to_execute": "print(6*7)"} + + +def _no_auth_connection() -> ToolConnection: + return ToolConnection( + id=uuid4(), + slug="qa-codeinterp", + provider_key=ToolProviderKind.COMPOSIO, + integration_key="codeinterpreter", + data={"no_auth": True, "project_id": str(uuid4())}, + flags={"is_active": True, "is_valid": True}, + ) + + +def test_no_auth_connection_flags_and_helpers(): + conn = _no_auth_connection() + assert conn.is_no_auth is True + assert conn.is_active is True + assert conn.is_valid is True + assert conn.provider_connection_id is None + + +async def test_resolve_connection_by_slug_accepts_no_auth(monkeypatch): + """A no-auth connection resolves despite having no provider_connection_id.""" + service = object.__new__(ToolsService) + conn = _no_auth_connection() + + async def _query(**_kwargs): + return [conn] + + monkeypatch.setattr(service, "query_connections", _query) + + resolved = await service.resolve_connection_by_slug( + project_id=uuid4(), + provider_key="composio", + integration_key="codeinterpreter", + connection_slug="qa-codeinterp", + ) + assert resolved is conn + + +async def test_resolve_connection_by_slug_rejects_authful_without_provider_id( + monkeypatch, +): + """An auth toolkit with no provider connection id still fails (regression guard).""" + service = object.__new__(ToolsService) + conn = ToolConnection( + id=uuid4(), + slug="gh", + provider_key=ToolProviderKind.COMPOSIO, + integration_key="github", + data={"project_id": str(uuid4())}, + flags={"is_active": True, "is_valid": True}, + ) + + async def _query(**_kwargs): + return [conn] + + monkeypatch.setattr(service, "query_connections", _query) + + with pytest.raises(ConnectionNotFoundError): + await service.resolve_connection_by_slug( + project_id=uuid4(), + provider_key="composio", + integration_key="github", + connection_slug="gh", + ) + + +# --- must-fix follow-ups from the F-011 review (server-owned flags + edges) --- + + +async def _capture_created_flags( + monkeypatch, *, no_auth: bool, client_flags: dict +) -> dict: + """Run create_connection with a faked provider and DAO; return the persisted flags/data.""" + service = object.__new__(ToolsService) + captured: dict = {} + + class _Adapter: + async def initiate_connection(self, *, request): + if no_auth: + return ToolConnectionResponse( + provider_connection_id="", + redirect_url=None, + connection_data={"no_auth": True}, + ) + return ToolConnectionResponse( + provider_connection_id="acc_pending", + redirect_url="https://composio/redirect", + connection_data={"connected_account_id": "acc_pending"}, + ) + + class _Registry: + def get(self, _key): + return _Adapter() + + class _Dao: + async def create_connection(self, *, project_id, user_id, connection_create): + captured["flags"] = dict(connection_create.flags or {}) + captured["data"] = dict(connection_create.data or {}) + return connection_create + + class _FakeEnv: + class agenta: + crypt_key = "x" * 32 + api_url = "http://test" + + service.adapter_registry = _Registry() + service.tools_dao = _Dao() + monkeypatch.setattr(service_mod, "env", _FakeEnv) + monkeypatch.setattr(service_mod, "make_oauth_state", lambda **_: "state") + + cc = ToolConnectionCreate( + slug="c1", + provider_key=ToolProviderKind.COMPOSIO, + integration_key="codeinterpreter" if no_auth else "github", + flags=client_flags, + ) + await service.create_connection( + project_id=uuid4(), user_id=uuid4(), connection_create=cc + ) + return captured + + +async def test_create_connection_overrides_client_flags_for_auth_toolkit(monkeypatch): + """A client cannot mark an auth-backed connection valid before its flow completes.""" + captured = await _capture_created_flags( + monkeypatch, no_auth=False, client_flags={"is_valid": True, "is_active": True} + ) + assert captured["flags"]["is_valid"] is False + assert captured["flags"]["is_active"] is True + + +async def test_create_connection_marks_no_auth_valid(monkeypatch): + """A no-auth connection is server-marked valid up front (no flow to wait for).""" + captured = await _capture_created_flags(monkeypatch, no_auth=True, client_flags={}) + assert captured["flags"]["is_valid"] is True + assert captured["flags"]["is_active"] is True + assert captured["data"].get("no_auth") is True + + +def test_is_no_auth_toolkit_mixed_mode_is_authful(): + """A toolkit that mixes NO_AUTH with a real scheme must stay auth-backed.""" + mixed = { + "slug": "x", + "auth_config_details": [{"mode": "NO_AUTH"}, {"mode": "OAUTH2"}], + } + assert _is_no_auth_toolkit(mixed) is False + + +async def test_execute_sends_connected_account_when_present(monkeypatch): + """Auth regression: a real provider connection id is still sent as connected_account_id.""" + adapter = object.__new__(ComposioToolsAdapter) + sent: dict = {} + + async def _post(path, *, json=None): + sent["json"] = json + return {"data": {"login": "ok"}, "successful": True, "error": None} + + monkeypatch.setattr(adapter, "_post", _post) + + await adapter.execute( + request=ToolExecutionRequest( + integration_key="github", + action_key="GET_THE_AUTHENTICATED_USER", + provider_connection_id="acc_123", + arguments={}, + ), + ) + assert sent["json"]["connected_account_id"] == "acc_123" + + +async def test_refresh_connection_no_auth_is_noop(): + """Refreshing a no-auth connection is a no-op, not a not-found error.""" + service = object.__new__(ToolsService) + conn = _no_auth_connection() + + class _Dao: + async def get_connection(self, *, project_id, connection_id): + return conn + + service.tools_dao = _Dao() + + out = await service.refresh_connection(project_id=uuid4(), connection_id=conn.id) + assert out is conn diff --git a/docs/design/agent-workflows/tool-resolution-layering/plan.md b/docs/design/agent-workflows/tool-resolution-layering/plan.md new file mode 100644 index 0000000000..99b2b68877 --- /dev/null +++ b/docs/design/agent-workflows/tool-resolution-layering/plan.md @@ -0,0 +1,248 @@ +# Tool & secret resolution: the SDK / service / backend responsibility split + +Status: v2, decisions folded in (2026-06-23). Branch: `feat/agent-service` (PR #4772). +v1 was the open-questions draft; this version records the decisions from the review with +Codex and the author's comments, so the open list is now small. + +## What we want to change + +Two related problems: + +1. **Resolution lives in the service, not the SDK.** Today the Agenta service resolves + gateway tools and secrets. The next consumer is an SDK user running a local backend (Pi or + Claude on their machine). They need the same gateway-tool and secret resolution. So the + resolution code must live in the SDK and be imported by the service, not the reverse. + +2. **The layer boundaries are unclear.** We want a clean split: + - The SDK / agent-to-service boundary **resolves information**. It turns neutral tool + declarations into runnable tool specs and turns secret names into values, then hands + those to the backend. + - The backend **decides how to execute**: whether a tool becomes an HTTP callback, a file + written into a sandbox, or something run in the cloud. That is a backend concern, not a + resolver concern. + + Example: a local backend running Pi has no callbacks for code tools. A code tool gets + written as a file in a folder Pi can run, or shipped to the cloud and handled there. + Whether to use callbacks at all is the backend's call. The resolver just provides the + resolved tool and its secrets. + +## What the code looks like today (grounded) + +Most of the resolution machinery is already in the SDK. The split that remains is small. + +### Already in the SDK (`sdks/python/agenta/sdk/agents/`) + +- **Config + spec models**, `tools/models.py`: `ToolConfig` (builtin/gateway/code/client), + `ToolSpec` (`CallbackToolSpec`/`CodeToolSpec`/`ClientToolSpec`), `ResolvedToolSet`, + `ToolCallback`, `GatewayToolResolution`. +- **The orchestrator**, `tools/resolver.py`: `ToolResolver` splits configs by type, resolves + code-tool secrets through an injected `ToolSecretProvider`, builds code/client specs, and + delegates gateway configs to an injected `GatewayToolResolver`. Default + `EnvironmentToolSecretProvider` reads the process env (the offline default). +- **The ports**, `tools/interfaces.py`: `ToolSecretProvider.get_many(names)` and + `GatewayToolResolver.resolve(tools)` (both Protocols). +- **MCP resolution**, `mcp/` (`MCPResolver`, `parse_mcp_server_configs`, `ResolvedMCPServer`). +- **The session/backend contracts**, `dtos.py` (`SessionConfig`, `HarnessAgentConfig`, + `PiAgentConfig`/`ClaudeAgentConfig`/`AgentaAgentConfig`), `interfaces.py` (`Backend`, + `Environment`, `Harness`, `Session`), harness adapters in `adapters/harnesses.py`. +- **The local backend seam**, `adapters/local.py`: `LocalBackend` exists but its methods + raise `NotImplementedError` (Phase 3/4 work, out of scope here). + +### Still in the service (`services/oss/src/agent/`), the platform-backed implementations + +- `client.py`: `agenta_api_base()` + `request_authorization()` + `TOOLS_TIMEOUT`. Derives the + base URL from `ag.tracing.otlp_url` (or env) and the auth from the tracing-propagation + `inject({})` (or `AGENTA_API_KEY`). +- `tools/gateway.py`: `AgentaGatewayToolResolver`, the `GatewayToolResolver` impl that POSTs + `/tools/resolve` and builds a `ToolCallback(endpoint=".../tools/call", auth)`. +- `tools/secrets.py`: `VaultToolSecretProvider` + `resolve_named_secrets`, the + `ToolSecretProvider` impl that POSTs `/secrets/resolve`. +- `secrets.py`: `resolve_harness_secrets()`, which GETs `/secrets/` and maps `provider_key` + vault entries to harness env vars (`OPENAI_API_KEY`, ...). This is the harness/model secret + path (LLM provider keys), distinct from the tool/named-secret path above. +- `tools/resolver.py`: `resolve_agent_resources()`, the composition that wires the SDK + `ToolResolver` + `MCPResolver` to the service providers, with the MCP flag gate + (`AGENTA_AGENT_ENABLE_MCP`). + +### Facts that shaped the plan + +- **`client.py` already uses only SDK primitives** (otlp/env base URL, propagation/env auth), + so moving the platform-backed resolvers into the SDK is mostly relocation. +- **The SDK already has a vault + API-client convention.** `middlewares/running/vault.py` + fetches `/secrets/` provider keys (with caching, local-env keys, permission checks), + sourcing the host from the singleton `api_url` and the credential from + `RunningContext.credentials`. The agent service re-implements the same fetch by hand. There + is a canonical pattern to align on, which answers the `gateway.py:54` "right patterns?" + comment. +- **Named-secret resolution (`/secrets/resolve`) does not exist yet in the API.** We treat it + as in-flight: the PSE-backed endpoint is being built per the vault-named-secrets design, so + this plan assumes it exists and ships a real named-secret provider, not a no-op. +- **`ToolCallback` is currently runner wire transport.** The gateway resolver builds it, + `ToolResolver` copies it into `ResolvedToolSet`, and Pi/Claude serialize it straight into + the `/run` payload (`dtos.py` `wire_tools`). See the D2 decision for why we keep it there + anyway. + +## The credential rule (load-bearing, drives D4) + +The Agenta service is one process serving many users with different projects, credentials, +and trace endpoints. So **per-user authorization can never come from a process-global** such +as `ag.DEFAULT_AGENTA_SINGLETON_INSTANCE`. A pure-singleton approach would leak one user's +auth into another's request. + +The working split, already used by `vault.py`: + +- **Base URL / host** comes from the singleton (or env). It is the same backend for everyone, + so a global is correct. +- **The caller's credential** comes from per-request context: `RunningContext.credentials`, + or the tracing-propagation `inject()` the agent uses today. Both are per-request, so each + call carries its own auth. + +This is the invariant the whole relocation must preserve. Before we rely on +`RunningContext`, we must confirm it is populated on the agent `/invoke` and `/messages` +routes (Codex traced that both call `wf.invoke(..., credentials=...)`, but the current +`secrets.py` docstring claims context does not reach the route). That confirmation is a +required route-level test, not an assumption (Phase E). + +## Target architecture: two responsibilities + +### Responsibility 1: Resolution (the SDK / boundary owns this) + +Turn a neutral `AgentConfig` (tool declarations + secret names + MCP servers) into resolved, +runnable specs and secret values. It calls the Agenta platform when needed (`/tools/resolve`, +`/secrets/resolve`, `/secrets/`). It lives in the SDK so the service and a local-backend SDK +user run the same code. + +Exposed as **three separate entrypoints** (no aggregate): + +- `resolve_tools(tools)`: builtin names, code specs, client specs, and gateway callback specs. + Code-tool named secrets are resolved inside this call via the injected `ToolSecretProvider`. +- `resolve_mcp(mcp_servers)`: resolved MCP servers. MCP named secrets are resolved inside this + call via the same `ToolSecretProvider`. The SDK entrypoint is ungated; the + `AGENTA_AGENT_ENABLE_MCP` deployment gate is applied service-side (the service's + `resolve_mcp_servers` wrapper returns `[]` when disabled), since the flag is a service + deployment concern, not an SDK one. +- `resolve_secrets()`: the harness/model provider keys, mapped to env vars. Optional by + design (see D6). + +### Responsibility 2: Execution wiring (the backend owns this) + +Given the resolved specs, each backend decides how its runtime executes each tool: + +- sandbox-agent backend (remote runner): gateway tools call back to `/tools/call`; code tools + ship to the runner. +- local Pi backend: code tools are written as files Pi runs; gateway tools still call back to + the platform when the env is connected (offline gateway is not possible, D1). +- a future cloud path: whatever that backend needs. + +Gateway tools are intrinsically platform-executed: any backend that runs them calls +`/tools/call`. The "callback vs file" choice is real only for code tools, which already do +not use a callback. So this boundary is mostly already honored; the work is to make it +explicit and fix the one wire-contract bug (D2). + +## The plan (phased) + +A and B are pure relocation, behavior-preserving, shippable with no flag. C rewires the +service onto the new entrypoints. D, E, F are boundary cleanup and the leftover comments. + +### Phase A: a `PlatformConnection` in the SDK +Create `agents/platform/` and move `client.py` there as an injected `PlatformConnection` +object (base URL + auth + timeout), not ambient globals. Resolution precedence: an explicit +connection, then per-request context (`RunningContext` / propagation), then the host fallback. +Per-user auth never from a global (the credential rule above). Access the singleton lazily, at +call time, never at import time (avoids the `agenta` import cycle). *Risk: low. Tests: both +credential sources plus a multi-user isolation test.* + +### Phase B: move the platform-backed resolvers into `agents/platform/` +- `AgentaGatewayToolResolver` (from `tools/gateway.py`), implementing `GatewayToolResolver`. +- The named-secret provider (from `tools/secrets.py`), implementing `ToolSecretProvider`, + backed by the PSE `/secrets/resolve` endpoint (assumed to exist). +- The provider-key fetch (from `secrets.py`), shared with `vault.py` so there is one client, + one cache, one parser. Provider keys stay optional (D6). + +All depend on the Phase A connection. The service deletes its copies and imports from the SDK. +*Risk: low to moderate. Tests: SDK unit tests with httpx mocked; existing service tests stay +green.* + +### Phase C: expose the three entrypoints and rewire the service +Replace `resolve_agent_resources` with `resolve_tools` / `resolve_mcp` / `resolve_secrets` in +the SDK. The service `app.py` calls the three directly when building `SessionConfig`. The +service `tools/` package shrinks to nothing or thin re-exports. *Risk: moderate (touches +app.py wiring). Tests: golden wire-contract stays byte-identical.* + +### Phase D: make the boundary explicit, fix the wire invariant +State and enforce the contract: resolution returns execution-neutral specs; the backend +assembles transport. **Keep `ToolCallback` in the gateway resolver** (D2): the gateway +callback endpoint is always the platform's `/tools/call`, intrinsic to a gateway tool, so +there is only one possible transport and no real choice to defer. Document code-tool delivery +(file vs callback) as the backend's choice. **Narrow the runner wire invariant**: change +`toolCallback` from "required when `customTools` is set" to "required only when a gateway +(callback) spec is present" (`services/agent/src/protocol.ts`), since code tools run without +`/tools/call`. *Risk: low.* + +### Phase E: close the harness-secret duplication +Add a route-level test that proves `RunningContext.credentials` is populated on `/invoke` and +`/messages`. If it is, drop the hand-rolled re-fetch and read from context like other workflow +services, and correct the stale `secrets.py` docstring. If it is not, both paths share the one +SDK provider-key helper from Phase B. *Risk: gated on the test.* + +### Phase F: the leftover architecture comments +- `app.py:100` (prompt vs stream asymmetry): unify so both paths own setup/cleanup the same + way (the batch path collects from the same helper the stream path uses). +- `app.py:136` ("feels outdated"): verify the `agenta:builtin:agent:v0` "future work" note + against the catalog-type work that landed; update or delete. +- `gateway.py:54` ("right patterns?"): resolved by A and B (align on the SDK client/vault + convention). + +## Decisions (resolved) + +- **D1: local + gateway.** Gateway tools (Composio) require Agenta and only work connected. + Code and builtin tools do not need Agenta. Offline gateway fails clearly rather than + silently skipping. Local backend therefore has two documented modes: offline (builtin + + code) and connected (adds gateway). +- **D2: callback assembly.** Keep it simple: the resolver builds the gateway `ToolCallback`. + The endpoint is intrinsic to gateway tools, so there is nothing for the backend to decide. + The only concrete fix is narrowing the runner wire invariant (Phase D). +- **D3: where resolution runs.** Explicit caller call. No hidden harness or environment magic. + The service and local paths call the same functions. +- **D4: credential/host source of truth.** Host from the singleton or env (global, shared); + per-user auth from per-request context (`RunningContext` / propagation), never from a + global. Precedence: explicit connection, then request context, then host fallback. Verified + by the Phase E route test. +- **D5: package layout.** Three separate resolvers, each in its own place, no aggregate. Pure + models, ports, and the `ToolResolver` framework stay in `agents/tools`. The Agenta-platform + HTTP adapters move to a new `agents/platform`. MCP stays in `agents/mcp`. +- **D6: provider keys are optional.** A user may run their own sidecar with a self-managed + Codex or Claude Code subscription, so model auth does not always come from the vault. + `resolve_secrets` fills provider keys when the vault has them and returns empty when it does + not, and the harness falls back to its own login or OAuth. Provider-key fetching is shared + with `vault.py` (one cache/parser). Tests must cover both paths: vault-provided keys, and + the self-managed-subscription case where no key is injected. + +## How this maps to the review comments + +| Comment | Where it lands | +|---|---| +| `resolver.py:39` resolution should be in the SDK | Phases A to C | +| `resolver.py:75` MCP mixed in / wrong place | Phase C + D5 (separate `resolve_mcp`, flag-gated) | +| `gateway.py:54` right API patterns? | Phases A and B (align on the SDK client/vault convention) | +| `app.py:100` ugly prompt/stream split | Phase F | +| `app.py:136` feels outdated | Phase F | + +## Non-goals + +- Implementing `LocalBackend.create_session` (Phase 3/4 runner-delivery work). This plan makes + resolution available to it, not the backend itself. +- Offline gateway-tool execution (D1). +- Changing the wire protocol beyond narrowing the `toolCallback` invariant (Phase D). + +## Test & rollout + +- SDK unit tests for each relocated piece (httpx mocked), mirroring the existing service tests. + Run via `uv run python -m pytest ... -n0`. +- A multi-user isolation test for `PlatformConnection` (two callers, two credentials, no + bleed). +- A route-level test for `RunningContext.credentials` on `/invoke` and `/messages` (gates + Phase E). +- Provider-key tests for both the vault path and the self-managed-subscription path (D6). +- Keep the golden wire-contract test byte-identical through A to C (pure relocation). +- Land A and B first (behavior-preserving), then C, then D to F. diff --git a/sdks/python/agenta/sdk/agents/platform/__init__.py b/sdks/python/agenta/sdk/agents/platform/__init__.py new file mode 100644 index 0000000000..13ecd64b58 --- /dev/null +++ b/sdks/python/agenta/sdk/agents/platform/__init__.py @@ -0,0 +1,34 @@ +"""Agenta-platform-backed adapters for agent tool/secret resolution. + +This package holds the implementations that reach the Agenta backend over HTTP: the +:class:`PlatformConnection` (base URL + per-call auth), the gateway tool resolver, the +named-secret provider, and the provider-key fetch, plus the three resolution entrypoints +(:func:`resolve_tools`, :func:`resolve_mcp`, :func:`resolve_secrets`). The pure resolution +framework and the neutral models stay in ``agenta.sdk.agents.tools``; only the platform-bound +code lives here. + +Kept out of ``agenta.sdk.agents.__init__`` eager exports on purpose: these modules reach +into ``agenta``/the SDK singleton, so importing them lazily (``from agenta.sdk.agents.platform +import ...``) avoids re-entering ``agenta``'s own import. +""" + +from .connection import PlatformConnection, default_timeout +from .gateway import AgentaGatewayToolResolver +from .resolve import resolve_mcp, resolve_secrets, resolve_tools +from .secrets import ( + AgentaNamedSecretProvider, + resolve_named_secrets, + resolve_provider_keys, +) + +__all__ = [ + "PlatformConnection", + "default_timeout", + "AgentaGatewayToolResolver", + "AgentaNamedSecretProvider", + "resolve_named_secrets", + "resolve_provider_keys", + "resolve_tools", + "resolve_mcp", + "resolve_secrets", +] diff --git a/sdks/python/agenta/sdk/agents/platform/connection.py b/sdks/python/agenta/sdk/agents/platform/connection.py new file mode 100644 index 0000000000..2556e5aba6 --- /dev/null +++ b/sdks/python/agenta/sdk/agents/platform/connection.py @@ -0,0 +1,152 @@ +"""How agent tool/secret resolution reaches the Agenta backend. + +:class:`PlatformConnection` carries the base URL and the per-call authorization that the +platform-backed resolvers (gateway tools, named secrets, provider keys) use. It exists so +the Agenta service and a standalone SDK user resolve against the platform the same way. + +Two halves, deliberately sourced differently (see the agent-workflows tool-resolution plan, +decision D4): + +- **Base URL** is global: the same Agenta backend for every caller. It may be set explicitly + or derived from the SDK's configured host (the OTLP endpoint) or env. +- **Authorization** is per-call and must come from the caller's request context, never a + process-global, so in the shared service one caller's credential never leaks into + another's run. Resolution order: an explicit value, then the per-request tracing + propagation, then the process API key as a last-resort fallback (the standalone-SDK case, + where the env key is the user's own). + +``agenta`` is imported lazily inside the helpers, never at module import time, so this module +stays safe to import before the SDK singleton exists (it must not re-enter ``agenta``'s own +import). +""" + +from __future__ import annotations + +import os +from typing import Dict, Optional + +from agenta.sdk.utils.logging import get_module_logger + +log = get_module_logger(__name__) + +# Budget for one backend round-trip (the tool catalog/connection check, the vault fetch). +DEFAULT_TOOLS_TIMEOUT = 30.0 + + +def default_timeout() -> float: + """The configured backend round-trip budget, guarded against a malformed env value.""" + raw = os.getenv("AGENTA_AGENT_TOOLS_TIMEOUT") + if raw is None: + return DEFAULT_TOOLS_TIMEOUT + try: + return float(raw) + except ValueError: + log.warning( + "agent: invalid AGENTA_AGENT_TOOLS_TIMEOUT %r; using %s", + raw, + DEFAULT_TOOLS_TIMEOUT, + ) + return DEFAULT_TOOLS_TIMEOUT + + +def _derive_base_url() -> Optional[str]: + """Resolve the Agenta backend base URL (``.../api``). + + Prefers an explicit override, then derives it from the OTLP endpoint the SDK is + configured with (``{host}/api/otlp/v1/traces``), then falls back to env. Returns ``None`` + when nothing is configured; callers only need this when tools or secrets apply. + """ + override = os.getenv("AGENTA_AGENT_TOOLS_API_URL") + if override: + return override.rstrip("/") + + try: + import agenta as ag + + otlp_url = ag.tracing.otlp_url + except Exception: # pylint: disable=broad-except + otlp_url = None + if otlp_url and "/otlp/" in otlp_url: + return otlp_url.split("/otlp/", 1)[0].rstrip("/") + + api_url = os.getenv("AGENTA_API_URL") + if api_url: + return api_url.rstrip("/") + + return None + + +def _derive_authorization() -> Optional[str]: + """The project-scoped credential to call the Agenta backend, per request. + + Reuses the same propagation the OTLP credential rides on (the caller's Authorization), + falling back to the process API key the way the tracing sidecar does. Scoping to the + caller keeps an agent run from invoking tools the user could not. + """ + try: + from agenta.sdk.engines.tracing.propagation import inject + + authorization = inject({}).get("Authorization") + except Exception: # pylint: disable=broad-except + authorization = None + if authorization: + return authorization + + api_key = os.getenv("AGENTA_API_KEY") + if api_key: + return f"ApiKey {api_key}" + + return None + + +class PlatformConnection: + """Base URL + per-call authorization for the platform-backed resolvers. + + Construct with no arguments to resolve everything from the ambient SDK config and the + per-request context (the service and standalone defaults). Pass ``base_url`` / + ``authorization`` to pin them explicitly (tests, or an SDK user wiring their own values). + Both are resolved lazily on each access, never cached, so a long-lived connection used + across many requests always reflects the current caller's context. + """ + + def __init__( + self, + *, + base_url: Optional[str] = None, + authorization: Optional[str] = None, + timeout: Optional[float] = None, + ) -> None: + self._base_url = base_url.rstrip("/") if base_url else None + self._authorization = authorization + self._timeout = timeout + + @property + def timeout(self) -> float: + return self._timeout if self._timeout is not None else default_timeout() + + def base_url(self) -> Optional[str]: + """The backend base URL: explicit, else derived from SDK config/env. ``None`` if unset.""" + return self._base_url or _derive_base_url() + + def authorization(self) -> Optional[str]: + """The caller's Authorization: explicit, else the per-request context, else env key.""" + return self._authorization or _derive_authorization() + + def headers( + self, *, json: bool = True, authorization: Optional[str] = None + ) -> Dict[str, str]: + """Request headers for a backend call: content type plus Authorization when present. + + Pass ``authorization`` to reuse a value the caller already resolved (so a request + header and, e.g., a ``ToolCallback`` carry the same credential from one resolution); + omit it to resolve the per-request credential here. + """ + headers: Dict[str, str] = {} + if json: + headers["Content-Type"] = "application/json" + authorization = ( + authorization if authorization is not None else self.authorization() + ) + if authorization: + headers["Authorization"] = authorization + return headers diff --git a/sdks/python/agenta/sdk/agents/platform/gateway.py b/sdks/python/agenta/sdk/agents/platform/gateway.py new file mode 100644 index 0000000000..3fc9a41692 --- /dev/null +++ b/sdks/python/agenta/sdk/agents/platform/gateway.py @@ -0,0 +1,207 @@ +"""Agenta HTTP adapter for server-bound gateway tools. + +Resolves gateway (Composio) tool declarations into runnable callback specs by asking the +Agenta platform (`POST /tools/resolve`), and points their calls back at `/tools/call`. This +is the connected path: gateway tools are platform-executed, so any backend that runs them +calls the platform. Lives in the SDK so the service and a connected standalone SDK user +resolve gateway tools the same way. + +The returned `ToolCallback(endpoint, auth)` stays assembled here on purpose: the gateway +endpoint is intrinsic to a gateway tool (there is only one transport), so it is a transport +hint the backend forwards, not a choice the backend makes. +""" + +from __future__ import annotations + +from typing import Any, Dict, Optional, Sequence + +import httpx + +from agenta.sdk.agents.tools import ( + CallbackToolSpec, + GatewayToolConfig, + GatewayToolResolution, + GatewayToolResolutionError, + ToolCallback, + UnsupportedToolProviderError, +) +from agenta.sdk.utils.logging import get_module_logger + +from .connection import PlatformConnection + +log = get_module_logger(__name__) + + +def _normalize_reference(reference: str) -> str: + return reference.replace("__", ".") + + +def _to_gateway_reference(tool_config: GatewayToolConfig) -> Dict[str, Any]: + reference: Dict[str, Any] = { + "type": "gateway", + "provider": tool_config.provider, + "integration": tool_config.integration, + "action": tool_config.action, + "connection": tool_config.connection, + } + if tool_config.name: + reference["name"] = tool_config.name + return reference + + +class AgentaGatewayToolResolver: + """`GatewayToolResolver` backed by the Agenta platform's `/tools/resolve` endpoint.""" + + def __init__(self, connection: Optional[PlatformConnection] = None) -> None: + self._connection = connection or PlatformConnection() + + async def resolve( + self, + tools: Sequence[GatewayToolConfig], + ) -> GatewayToolResolution: + for tool_config in tools: + if tool_config.provider != "composio": + raise UnsupportedToolProviderError(tool_config.provider) + + api_base = self._connection.base_url() + if not api_base: + error = GatewayToolResolutionError( + "Agent has gateway tools configured but the Agenta API base URL " + "is unknown. Set AGENTA_AGENT_TOOLS_API_URL or AGENTA_API_URL." + ) + log.warning("agent: gateway tool resolution failed: %s", error) + raise error + + # Resolve the credential once and reuse it for both the request header and the + # ToolCallback, so they cannot diverge across the two reads. + authorization = self._connection.authorization() + headers = self._connection.headers(authorization=authorization) + + references = [_to_gateway_reference(tool_config) for tool_config in tools] + configs_by_reference: dict[str, GatewayToolConfig] = {} + for tool_config in tools: + reference = _normalize_reference(tool_config.reference) + if reference in configs_by_reference: + error = GatewayToolResolutionError( + f"Duplicate gateway reference: {reference}", + reference=reference, + ) + log.warning("agent: %s", error) + raise error + configs_by_reference[reference] = tool_config + + try: + async with httpx.AsyncClient(timeout=self._connection.timeout) as client: + response = await client.post( + f"{api_base}/tools/resolve", + json={"tools": references}, + headers=headers, + ) + except httpx.HTTPError as exc: + log.warning( + "agent: gateway tool resolution request failed for %d tool(s)", + len(tools), + exc_info=True, + ) + raise GatewayToolResolutionError( + "Gateway tool resolution request failed", + ref_count=len(tools), + ) from exc + + if response.status_code >= 400: + error = GatewayToolResolutionError( + f"Gateway tool resolution failed (HTTP {response.status_code})", + status=response.status_code, + ref_count=len(tools), + ) + log.warning("agent: %s", error) + raise error + + try: + payload = response.json() or {} + except ValueError as exc: + log.warning( + "agent: gateway tool resolution returned invalid JSON", + exc_info=True, + ) + raise GatewayToolResolutionError( + "Gateway tool resolution returned invalid JSON", + ref_count=len(tools), + ) from exc + + raw_specs = payload.get("custom") if isinstance(payload, dict) else None + if not isinstance(raw_specs, list): + raw_specs = [] + if len(raw_specs) != len(tools): + error = GatewayToolResolutionError( + f"Gateway tool resolution returned {len(raw_specs)} spec(s) for " + f"{len(tools)} ref(s); expected one per ref.", + ref_count=len(tools), + spec_count=len(raw_specs), + ) + log.warning("agent: %s", error) + raise error + + specs_by_reference: dict[str, dict[str, Any]] = {} + for raw_spec in raw_specs: + if not isinstance(raw_spec, dict): + error = GatewayToolResolutionError( + "Gateway tool resolution returned a non-object spec" + ) + log.warning("agent: %s", error) + raise error + call_ref = raw_spec.get("call_ref") + if not call_ref: + error = GatewayToolResolutionError( + "Gateway tool resolution returned an incomplete spec " + f"(name={raw_spec.get('name')!r}, call_ref={call_ref!r})" + ) + log.warning("agent: %s", error) + raise error + reference = _normalize_reference(str(call_ref)) + if reference in specs_by_reference: + error = GatewayToolResolutionError( + f"Gateway tool resolution returned duplicate ref: {reference}", + reference=reference, + ) + log.warning("agent: %s", error) + raise error + specs_by_reference[reference] = raw_spec + + tool_specs: list[CallbackToolSpec] = [] + for reference, tool_config in configs_by_reference.items(): + raw_spec = specs_by_reference.get(reference) + if raw_spec is None: + error = GatewayToolResolutionError( + f"Gateway tool resolution did not return ref: {reference}", + reference=reference, + ) + log.warning("agent: %s", error) + raise error + name = raw_spec.get("name") + if not name: + error = GatewayToolResolutionError( + f"Gateway tool resolution returned an incomplete spec for {reference}", + reference=reference, + ) + log.warning("agent: %s", error) + raise error + tool_specs.append( + CallbackToolSpec( + name=str(name), + description=raw_spec.get("description") or str(name), + input_schema=raw_spec.get("input_schema") + or {"type": "object", "properties": {}}, + call_ref=str(raw_spec["call_ref"]), + needs_approval=tool_config.needs_approval, + render=tool_config.render, + ) + ) + + return GatewayToolResolution( + tool_specs=tool_specs, + tool_callback=ToolCallback( + endpoint=f"{api_base}/tools/call", + authorization=authorization, + ), + ) diff --git a/sdks/python/agenta/sdk/agents/platform/resolve.py b/sdks/python/agenta/sdk/agents/platform/resolve.py new file mode 100644 index 0000000000..4f9e6f7fca --- /dev/null +++ b/sdks/python/agenta/sdk/agents/platform/resolve.py @@ -0,0 +1,65 @@ +"""The three resolution entrypoints, composed over the SDK framework + platform adapters. + +Deliberately three separate functions, not one aggregate: a caller resolves only what it +needs. Each defaults to the Agenta-platform-backed adapters (the connected path) but accepts +injected adapters, so an offline standalone user can pass an env-backed secret provider and +no gateway resolver, and a test can pass fakes. + +- ``resolve_tools`` -> runnable tool specs (builtin names, code/client specs, gateway callback + specs). Code-tool named secrets are resolved through the secret provider here. +- ``resolve_mcp`` -> resolved MCP servers (named secrets injected). No deployment flag gate + here; gating MCP on/off is the caller's concern. +- ``resolve_secrets`` -> the harness/model provider keys (``agenta.sdk.agents.platform``'s + ``resolve_provider_keys``), optional by design. +""" + +from __future__ import annotations + +from typing import Any, List, Optional, Sequence + +from agenta.sdk.agents.mcp import ( + MCPResolver, + ResolvedMCPServer, + parse_mcp_server_configs, +) +from agenta.sdk.agents.tools import ( + MissingSecretPolicy, + ResolvedToolSet, + ToolResolver, + coerce_tool_configs, +) +from agenta.sdk.agents.tools.interfaces import GatewayToolResolver, ToolSecretProvider + +from .gateway import AgentaGatewayToolResolver +from .secrets import AgentaNamedSecretProvider +from .secrets import resolve_provider_keys as resolve_secrets + +__all__ = ["resolve_tools", "resolve_mcp", "resolve_secrets"] + + +async def resolve_tools( + tools: Sequence[Any], + *, + secret_provider: Optional[ToolSecretProvider] = None, + gateway_resolver: Optional[GatewayToolResolver] = None, + missing_secret_policy: MissingSecretPolicy = MissingSecretPolicy.ERROR, +) -> ResolvedToolSet: + """Resolve tool declarations into runnable specs. Defaults to the Agenta platform adapters.""" + return await ToolResolver( + secret_provider=secret_provider or AgentaNamedSecretProvider(), + gateway_resolver=gateway_resolver or AgentaGatewayToolResolver(), + missing_secret_policy=missing_secret_policy, + ).resolve(coerce_tool_configs(tools).tool_configs) + + +async def resolve_mcp( + mcp_servers: Sequence[Any], + *, + secret_provider: Optional[ToolSecretProvider] = None, + missing_secret_policy: MissingSecretPolicy = MissingSecretPolicy.ERROR, +) -> List[ResolvedMCPServer]: + """Resolve MCP server declarations (named secrets injected). Caller decides whether to call.""" + return await MCPResolver( + secret_provider=secret_provider or AgentaNamedSecretProvider(), + missing_secret_policy=missing_secret_policy, + ).resolve(parse_mcp_server_configs(mcp_servers)) diff --git a/sdks/python/agenta/sdk/agents/platform/secrets.py b/sdks/python/agenta/sdk/agents/platform/secrets.py new file mode 100644 index 0000000000..0d66a57099 --- /dev/null +++ b/sdks/python/agenta/sdk/agents/platform/secrets.py @@ -0,0 +1,141 @@ +"""Agenta-platform-backed secret resolution. + +Two distinct vault reads, both best-effort (an outage returns empty rather than failing the +run, since a project with no secret-bearing tools still runs): + +- `resolve_named_secrets` (`POST /secrets/resolve`): named secret values for code-tool and + MCP environments, resolved by explicit name. Pairs with the resolver's + `MissingSecretPolicy.ERROR`, so a tool whose declared secret is absent then hard-fails. +- `resolve_provider_keys` (`GET /secrets/`): the project's LLM provider keys, mapped to the + env vars a harness reads. Optional by design: when the vault has none, the harness falls + back to its own login/OAuth, so self-managed Pi/Claude sidecars keep working. + +Logs never include secret names or values, only counts. +""" + +from __future__ import annotations + +from typing import Dict, Mapping, Optional, Sequence + +import httpx + +from agenta.sdk.utils.logging import get_module_logger + +from .connection import PlatformConnection + +log = get_module_logger(__name__) + + +async def resolve_named_secrets( + names: Sequence[str], + *, + connection: Optional[PlatformConnection] = None, +) -> Dict[str, str]: + """Resolve project vault secrets by name for tool and MCP environments. Best-effort.""" + if not names: + return {} + + connection = connection or PlatformConnection() + api_base = connection.base_url() + if not api_base: + return {} + + try: + async with httpx.AsyncClient(timeout=connection.timeout) as client: + response = await client.post( + f"{api_base}/secrets/resolve", + json={"names": list(names)}, + headers=connection.headers(), + ) + if response.status_code >= 400: + log.warning( + "agent: named-secret resolve HTTP %s for %d name(s)", + response.status_code, + len(names), + ) + return {} + data = response.json() or {} + except Exception: # pylint: disable=broad-except + log.warning( + "agent: named-secret resolve failed for %d name(s)", + len(names), + exc_info=True, + ) + return {} + + resolved = data.get("secrets") if isinstance(data, dict) else None + resolved = resolved if isinstance(resolved, dict) else {} + requested = {str(name) for name in names} + missing = [name for name in names if name not in resolved] + if missing: + log.warning("agent: %d named secret(s) unresolved", len(missing)) + # Restrict to the requested set so an upstream that returns extras never leaks + # unrequested secrets into runtime memory. + return { + str(key): str(value) + for key, value in resolved.items() + if value is not None and str(key) in requested + } + + +class AgentaNamedSecretProvider: + """`ToolSecretProvider` backed by the Agenta vault's named-secret resolver.""" + + def __init__(self, connection: Optional[PlatformConnection] = None) -> None: + self._connection = connection or PlatformConnection() + + async def get_many(self, names: Sequence[str]) -> Mapping[str, str]: + return await resolve_named_secrets(names, connection=self._connection) + + +# Map a vault standard-provider kind to the env var the harness (Pi/Claude/litellm) reads. +# Only providers an agent harness can use are listed. +_PROVIDER_ENV_VARS = { + "openai": "OPENAI_API_KEY", + "anthropic": "ANTHROPIC_API_KEY", + "gemini": "GEMINI_API_KEY", + "mistral": "MISTRAL_API_KEY", + "mistralai": "MISTRAL_API_KEY", + "groq": "GROQ_API_KEY", + "together_ai": "TOGETHERAI_API_KEY", + "openrouter": "OPENROUTER_API_KEY", +} + + +async def resolve_provider_keys( + *, + connection: Optional[PlatformConnection] = None, +) -> Dict[str, str]: + """Fetch the project vault's provider keys as ``{ENV_VAR: key}``. Best-effort, optional. + + Empty when the vault has none, in which case the harness falls back to its own + login/OAuth (self-managed Pi/Claude sidecars), so absence is valid, never an error. + """ + connection = connection or PlatformConnection() + api_base = connection.base_url() + if not api_base: + return {} + + try: + async with httpx.AsyncClient(timeout=connection.timeout) as client: + response = await client.get( + f"{api_base}/secrets/", headers=connection.headers() + ) + if response.status_code >= 400: + log.warning("agent: vault secrets fetch HTTP %s", response.status_code) + return {} + secrets = response.json() or [] + except Exception: # pylint: disable=broad-except + log.warning("agent: vault secrets fetch failed", exc_info=True) + return {} + + env: Dict[str, str] = {} + for secret in secrets: + if not isinstance(secret, dict) or secret.get("kind") != "provider_key": + continue + data = secret.get("data") or {} + env_var = _PROVIDER_ENV_VARS.get(str(data.get("kind", "")).lower()) + key = (data.get("provider") or {}).get("key") + if env_var and key: + env.setdefault(env_var, key) + return env diff --git a/sdks/python/oss/tests/pytest/unit/agents/platform/__init__.py b/sdks/python/oss/tests/pytest/unit/agents/platform/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdks/python/oss/tests/pytest/unit/agents/platform/conftest.py b/sdks/python/oss/tests/pytest/unit/agents/platform/conftest.py new file mode 100644 index 0000000000..bd9883a4c4 --- /dev/null +++ b/sdks/python/oss/tests/pytest/unit/agents/platform/conftest.py @@ -0,0 +1,99 @@ +"""Fixtures for the platform-adapter tests: a fake httpx client and a pinned connection. + +These tests exercise the real adapter code against a mocked HTTP boundary (no live backend, +no respx/pytest-httpx dependency). ``fake_http`` patches ``httpx.AsyncClient`` on a given +adapter module and returns a ``capture`` dict the test asserts the outgoing request against. +The base URL and authorization are supplied by injecting a :class:`PlatformConnection`, not +by patching module globals, which is the adapter's real seam. +""" + +from __future__ import annotations + +import json +from typing import Any, Dict, Optional + +import pytest + +from agenta.sdk.agents.platform import PlatformConnection + +_ENV_VARS = ( + "AGENTA_AGENT_TOOLS_TIMEOUT", + "AGENTA_AGENT_TOOLS_API_URL", + "AGENTA_API_URL", + "AGENTA_API_KEY", +) + + +@pytest.fixture(autouse=True) +def _clean_env(monkeypatch): + """No ambient config leaks in, so an unset connection truly resolves to ``None``.""" + for name in _ENV_VARS: + monkeypatch.delenv(name, raising=False) + monkeypatch.setattr( + "agenta.sdk.engines.tracing.propagation.inject", + lambda carrier: carrier, + ) + + +@pytest.fixture +def connection() -> PlatformConnection: + """A connection pinned to a fake backend with an explicit caller credential.""" + return PlatformConnection(base_url="https://api.x/api", authorization="Access tok") + + +class _FakeResponse: + def __init__(self, status_code: int, payload: Any, text: Optional[str]) -> None: + self.status_code = status_code + self._payload = payload if payload is not None else {} + self.text = text if text is not None else json.dumps(self._payload) + + def json(self) -> Any: + return self._payload + + +def _fake_async_client(*, response, raises, capture: Dict[str, Any]): + class _Client: + def __init__(self, *args, **kwargs) -> None: + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + return False + + async def post(self, url, json=None, headers=None): + capture.update(method="POST", url=url, json=json, headers=headers) + if raises: + raise raises + return response + + async def get(self, url, headers=None): + capture.update(method="GET", url=url, headers=headers) + if raises: + raise raises + return response + + return _Client + + +@pytest.fixture +def fake_http(monkeypatch): + def _install( + module, + *, + status: int = 200, + payload: Any = None, + text: Optional[str] = None, + raises: Optional[BaseException] = None, + ) -> Dict[str, Any]: + capture: Dict[str, Any] = {} + response = _FakeResponse(status, payload, text) + monkeypatch.setattr( + module.httpx, + "AsyncClient", + _fake_async_client(response=response, raises=raises, capture=capture), + ) + return capture + + return _install diff --git a/sdks/python/oss/tests/pytest/unit/agents/platform/test_connection.py b/sdks/python/oss/tests/pytest/unit/agents/platform/test_connection.py new file mode 100644 index 0000000000..03f0ca5353 --- /dev/null +++ b/sdks/python/oss/tests/pytest/unit/agents/platform/test_connection.py @@ -0,0 +1,140 @@ +"""Unit tests for the SDK platform connection (base URL + per-call authorization).""" + +from __future__ import annotations + +import pytest + +from agenta.sdk.agents.platform import PlatformConnection, default_timeout +from agenta.sdk.agents.platform.connection import DEFAULT_TOOLS_TIMEOUT + +# Env vars the connection reads; cleared per test so the host environment can't leak in. +_ENV_VARS = ( + "AGENTA_AGENT_TOOLS_TIMEOUT", + "AGENTA_AGENT_TOOLS_API_URL", + "AGENTA_API_URL", + "AGENTA_API_KEY", +) + + +@pytest.fixture(autouse=True) +def _clean_env(monkeypatch): + """Start each test from a known-empty config, with no ambient request credential.""" + for name in _ENV_VARS: + monkeypatch.delenv(name, raising=False) + # No per-request tracing context by default; tests opt in explicitly. + monkeypatch.setattr( + "agenta.sdk.engines.tracing.propagation.inject", + lambda carrier: carrier, + ) + + +# --- timeout --------------------------------------------------------------- + + +def test_timeout_defaults_when_unset(): + assert default_timeout() == DEFAULT_TOOLS_TIMEOUT + assert PlatformConnection().timeout == DEFAULT_TOOLS_TIMEOUT + + +def test_timeout_reads_env(monkeypatch): + monkeypatch.setenv("AGENTA_AGENT_TOOLS_TIMEOUT", "5") + assert default_timeout() == 5.0 + + +def test_timeout_falls_back_on_malformed_env(monkeypatch): + monkeypatch.setenv("AGENTA_AGENT_TOOLS_TIMEOUT", "not-a-number") + assert default_timeout() == DEFAULT_TOOLS_TIMEOUT # guarded, no ValueError at use + + +def test_explicit_timeout_wins(monkeypatch): + monkeypatch.setenv("AGENTA_AGENT_TOOLS_TIMEOUT", "5") + assert PlatformConnection(timeout=12.0).timeout == 12.0 + + +# --- base URL -------------------------------------------------------------- + + +def test_base_url_explicit_overrides_everything(monkeypatch): + monkeypatch.setenv("AGENTA_API_URL", "https://env.example/api") + conn = PlatformConnection(base_url="https://explicit.example/api/") + assert conn.base_url() == "https://explicit.example/api" # trailing slash trimmed + + +def test_base_url_from_tools_api_url_env(monkeypatch): + monkeypatch.setenv("AGENTA_AGENT_TOOLS_API_URL", "https://tools.example/api/") + assert PlatformConnection().base_url() == "https://tools.example/api" + + +def test_base_url_from_api_url_env(monkeypatch): + monkeypatch.setenv("AGENTA_API_URL", "https://api.example/api/") + assert PlatformConnection().base_url() == "https://api.example/api" + + +def test_base_url_none_when_unconfigured(): + # No env, and a bare SDK has no configured OTLP endpoint to derive from. + assert PlatformConnection().base_url() is None + + +# --- authorization (per call, never cached) -------------------------------- + + +def test_authorization_explicit_wins(monkeypatch): + monkeypatch.setenv("AGENTA_API_KEY", "envkey") + assert PlatformConnection(authorization="Bearer x").authorization() == "Bearer x" + + +def test_authorization_from_request_context(monkeypatch): + # The caller's Authorization rides on the tracing propagation, per request. + monkeypatch.setattr( + "agenta.sdk.engines.tracing.propagation.inject", + lambda carrier: {**carrier, "Authorization": "Bearer caller"}, + ) + monkeypatch.setenv( + "AGENTA_API_KEY", "envkey" + ) # context must win over the env fallback + assert PlatformConnection().authorization() == "Bearer caller" + + +def test_authorization_falls_back_to_process_api_key(monkeypatch): + monkeypatch.setenv("AGENTA_API_KEY", "envkey") + assert PlatformConnection().authorization() == "ApiKey envkey" + + +def test_authorization_none_when_nothing_available(): + assert PlatformConnection().authorization() is None + + +def test_authorization_resolved_per_call_not_cached(monkeypatch): + # A long-lived connection must reflect the current caller, not a value frozen at init. + conn = PlatformConnection() + monkeypatch.setenv("AGENTA_API_KEY", "first") + assert conn.authorization() == "ApiKey first" + monkeypatch.setenv("AGENTA_API_KEY", "second") + assert conn.authorization() == "ApiKey second" + + +# --- headers --------------------------------------------------------------- + + +def test_headers_include_auth_when_present(monkeypatch): + monkeypatch.setenv("AGENTA_API_KEY", "k") + headers = PlatformConnection().headers() + assert headers["Content-Type"] == "application/json" + assert headers["Authorization"] == "ApiKey k" + + +def test_headers_omit_auth_when_absent(): + headers = PlatformConnection().headers() + assert "Authorization" not in headers + assert headers["Content-Type"] == "application/json" + + +def test_headers_can_skip_content_type(): + assert "Content-Type" not in PlatformConnection().headers(json=False) + + +def test_headers_reuse_explicit_authorization(monkeypatch): + # An explicit authorization is reused verbatim, not re-resolved from context/env. + monkeypatch.setenv("AGENTA_API_KEY", "envkey") + headers = PlatformConnection().headers(authorization="Bearer pinned") + assert headers["Authorization"] == "Bearer pinned" diff --git a/sdks/python/oss/tests/pytest/unit/agents/platform/test_gateway_http.py b/sdks/python/oss/tests/pytest/unit/agents/platform/test_gateway_http.py new file mode 100644 index 0000000000..d41407a07d --- /dev/null +++ b/sdks/python/oss/tests/pytest/unit/agents/platform/test_gateway_http.py @@ -0,0 +1,153 @@ +"""The Agenta gateway tool resolver against a mocked ``POST /tools/resolve``.""" + +from __future__ import annotations + +import httpx +import pytest + +from agenta.sdk.agents import ( + GatewayToolConfig, + GatewayToolResolutionError, + ToolCallback, +) +from agenta.sdk.agents.platform import AgentaGatewayToolResolver, PlatformConnection +from agenta.sdk.agents.platform import gateway + + +def _resolver(connection): + return AgentaGatewayToolResolver(connection=connection) + + +def _gateway(**overrides) -> GatewayToolConfig: + base = dict(integration="github", action="GET_USER", connection="c1") + base.update(overrides) + return GatewayToolConfig(**base) + + +async def test_missing_api_base_raises_typed_error(): + resolver = _resolver(PlatformConnection()) # no base URL configured + with pytest.raises(GatewayToolResolutionError, match="API base URL"): + await resolver.resolve([_gateway()]) + + +async def test_gateway_metadata_and_description_fallback_are_preserved( + fake_http, connection +): + capture = fake_http( + gateway, + payload={ + "custom": [ + { + "name": "get_user", + "description": None, + "input_schema": {"type": "object"}, + "call_ref": "tools.composio.github.GET_USER.c1", + } + ] + }, + ) + resolved = await _resolver(connection).resolve( + [ + _gateway( + needs_approval=True, + render={"kind": "component", "component": "User"}, + ) + ] + ) + spec = resolved.tool_specs[0] + assert spec.description == "get_user" # falls back to name when null + assert spec.needs_approval is True + assert spec.render == {"kind": "component", "component": "User"} + assert spec.to_wire()["needsApproval"] is True + assert isinstance(resolved.tool_callback, ToolCallback) + assert resolved.tool_callback.endpoint == "https://api.x/api/tools/call" + assert resolved.tool_callback.authorization == "Access tok" + assert capture["url"] == "https://api.x/api/tools/resolve" + assert capture["json"]["tools"][0]["type"] == "gateway" + assert capture["headers"]["Authorization"] == "Access tok" + + +async def test_gateway_specs_are_joined_by_call_ref_not_position(fake_http, connection): + fake_http( + gateway, + payload={ + "custom": [ + { + "name": "second", + "description": "Second", + "input_schema": {}, + "call_ref": "tools.composio.github.SECOND.c2", + }, + { + "name": "first", + "description": "First", + "input_schema": {}, + "call_ref": "tools.composio.github.FIRST.c1", + }, + ] + }, + ) + resolved = await _resolver(connection).resolve( + [ + _gateway(action="FIRST", connection="c1", needs_approval=True), + _gateway( + action="SECOND", + connection="c2", + render={"kind": "component", "component": "Second"}, + ), + ] + ) + first, second = resolved.tool_specs + assert first.name == "first" + assert first.needs_approval is True + assert first.render is None + assert second.name == "second" + assert second.needs_approval is False + assert second.render == {"kind": "component", "component": "Second"} + + +async def test_transport_failure_is_logged_and_normalized( + fake_http, connection, monkeypatch +): + warnings: list = [] + monkeypatch.setattr( + gateway, + "log", + type( + "Log", + (), + {"warning": lambda self, *args, **kwargs: warnings.append(args)}, + )(), + ) + request = httpx.Request("POST", "https://api.x/api/tools/resolve") + fake_http(gateway, raises=httpx.ConnectError("offline", request=request)) + with pytest.raises(GatewayToolResolutionError) as caught: + await _resolver(connection).resolve([_gateway()]) + assert isinstance(caught.value.__cause__, httpx.ConnectError) + assert warnings + assert "gateway tool resolution request failed" in warnings[0][0].lower() + + +@pytest.mark.parametrize( + ("payload", "message"), + [ + ({"custom": []}, "expected one per ref"), + ( + {"custom": [{"name": "get_user", "description": "x", "input_schema": {}}]}, + "incomplete spec", + ), + ], +) +async def test_invalid_gateway_response_fails_fast( + fake_http, connection, payload, message +): + fake_http(gateway, payload=payload) + with pytest.raises(GatewayToolResolutionError, match=message): + await _resolver(connection).resolve([_gateway()]) + + +async def test_http_status_failure_is_typed(fake_http, connection): + fake_http(gateway, status=400, text="bad request") + with pytest.raises(GatewayToolResolutionError) as caught: + await _resolver(connection).resolve([_gateway()]) + assert caught.value.status == 400 diff --git a/sdks/python/oss/tests/pytest/unit/agents/platform/test_resolve.py b/sdks/python/oss/tests/pytest/unit/agents/platform/test_resolve.py new file mode 100644 index 0000000000..9c70159625 --- /dev/null +++ b/sdks/python/oss/tests/pytest/unit/agents/platform/test_resolve.py @@ -0,0 +1,45 @@ +"""The composition entrypoints: resolve_tools / resolve_mcp / resolve_secrets.""" + +from __future__ import annotations + +from typing import Mapping, Sequence + +from agenta.sdk.agents.platform import ( + resolve_provider_keys, + resolve_secrets, + resolve_tools, +) +from agenta.sdk.agents.platform import resolve_mcp + + +class _EmptySecrets: + async def get_many(self, names: Sequence[str]) -> Mapping[str, str]: + return {} + + +class _ExplodingGateway: + async def resolve(self, tools): + raise AssertionError( + "gateway resolver must not be called without gateway tools" + ) + + +async def test_resolve_tools_skips_gateway_without_gateway_tools(): + # No gateway tool ⇒ the gateway resolver (and its HTTP) is never touched. An exploding + # resolver proves the short-circuit: resolution completes without invoking it. + resolved = await resolve_tools( + ["read", {"type": "client", "name": "pick"}], + secret_provider=_EmptySecrets(), + gateway_resolver=_ExplodingGateway(), + ) + assert resolved.builtin_names == ["read"] + assert {spec.name for spec in resolved.tool_specs} == {"pick"} + + +async def test_resolve_mcp_empty_returns_empty(): + assert await resolve_mcp([], secret_provider=_EmptySecrets()) == [] + + +def test_resolve_secrets_is_the_provider_key_entrypoint(): + # The third entrypoint is the provider-key fetch (harness/model keys), not named secrets. + assert resolve_secrets is resolve_provider_keys diff --git a/sdks/python/oss/tests/pytest/unit/agents/platform/test_secrets_http.py b/sdks/python/oss/tests/pytest/unit/agents/platform/test_secrets_http.py new file mode 100644 index 0000000000..cd9c2fe32f --- /dev/null +++ b/sdks/python/oss/tests/pytest/unit/agents/platform/test_secrets_http.py @@ -0,0 +1,109 @@ +"""Named-secret and provider-key resolution against a mocked vault.""" + +from __future__ import annotations + +from agenta.sdk.agents.platform import ( + PlatformConnection, + resolve_named_secrets, + resolve_provider_keys, +) +from agenta.sdk.agents.platform import secrets + + +# --- named secrets (POST /secrets/resolve) --------------------------------- + + +async def test_named_secrets_are_resolved(fake_http, connection): + capture = fake_http( + secrets, + payload={"secrets": {"TOKEN": "value", "EMPTY": None}}, + ) + resolved = await resolve_named_secrets( + ["TOKEN", "EMPTY", "MISSING"], connection=connection + ) + assert resolved == {"TOKEN": "value"} + assert capture == { + "method": "POST", + "url": "https://api.x/api/secrets/resolve", + "json": {"names": ["TOKEN", "EMPTY", "MISSING"]}, + "headers": { + "Content-Type": "application/json", + "Authorization": "Access tok", + }, + } + + +async def test_named_secrets_restrict_to_requested(fake_http, connection): + # An upstream that returns extras must not leak unrequested secrets into memory. + fake_http( + secrets, + payload={"secrets": {"TOKEN": "value", "UNREQUESTED": "leak"}}, + ) + resolved = await resolve_named_secrets(["TOKEN"], connection=connection) + assert resolved == {"TOKEN": "value"} + + +async def test_named_secrets_without_api_base_return_empty(fake_http): + capture = fake_http(secrets) + assert await resolve_named_secrets(["TOKEN"], connection=PlatformConnection()) == {} + assert capture == {} # short-circuits before any HTTP + + +async def test_named_secret_http_failure_returns_empty(fake_http, connection): + fake_http(secrets, status=500) + assert await resolve_named_secrets(["TOKEN"], connection=connection) == {} + + +async def test_no_names_short_circuits(fake_http, connection): + capture = fake_http(secrets) + assert await resolve_named_secrets([], connection=connection) == {} + assert capture == {} + + +# --- provider keys (GET /secrets/) ----------------------------------------- + + +async def test_provider_keys_without_api_base_return_empty(fake_http): + assert await resolve_provider_keys(connection=PlatformConnection()) == {} + + +async def test_provider_keys_map_only_provider_keys_with_dedupe(fake_http, connection): + fake_http( + secrets, + payload=[ + { + "kind": "provider_key", + "data": {"kind": "openai", "provider": {"key": "sk-1"}}, + }, + # duplicate env var -> first one wins (setdefault). + { + "kind": "provider_key", + "data": {"kind": "openai", "provider": {"key": "sk-2"}}, + }, + { + "kind": "provider_key", + "data": {"kind": "anthropic", "provider": {"key": "sk-ant"}}, + }, + # not a provider key -> ignored. + {"kind": "other", "data": {"kind": "openai", "provider": {"key": "x"}}}, + # unmapped provider -> ignored. + { + "kind": "provider_key", + "data": {"kind": "made_up", "provider": {"key": "y"}}, + }, + # missing key -> ignored. + {"kind": "provider_key", "data": {"kind": "groq", "provider": {}}}, + ], + ) + env = await resolve_provider_keys(connection=connection) + assert env == {"OPENAI_API_KEY": "sk-1", "ANTHROPIC_API_KEY": "sk-ant"} + + +async def test_provider_keys_http_error_returns_empty(fake_http, connection): + fake_http(secrets, status=400) + assert await resolve_provider_keys(connection=connection) == {} + + +async def test_provider_keys_network_exception_returns_empty(fake_http, connection): + fake_http(secrets, raises=RuntimeError("network down")) + assert await resolve_provider_keys(connection=connection) == {} diff --git a/services/entrypoints/main.py b/services/entrypoints/main.py index 72cc291dfb..f52ac69ed8 100644 --- a/services/entrypoints/main.py +++ b/services/entrypoints/main.py @@ -43,6 +43,7 @@ ) from oss.src.chat import chat_v0_app from oss.src.completion import completion_v0_app +from oss.src.agent import agent_v0_app from entrypoints.legacy import register_legacy_routes @@ -134,6 +135,7 @@ async def health(): app.mount("/chat/v0", chat_v0_app) app.mount("/completion/v0", completion_v0_app) +app.mount("/agent/v0", agent_v0_app) register_legacy_routes( app=app, diff --git a/services/oss/src/agent/__init__.py b/services/oss/src/agent/__init__.py new file mode 100644 index 0000000000..8a1b875183 --- /dev/null +++ b/services/oss/src/agent/__init__.py @@ -0,0 +1,13 @@ +"""The Agenta agent workflow app and its glue. + +The handler and backend wiring are in ``app``; tool resolution in ``tools``; provider +secrets in ``secrets``; trace/usage glue in ``tracing``; the ``/inspect`` schemas in +``schemas``; the file-backed defaults in ``config``. The engine-agnostic runtime (the +backend/environment/harness ports and their adapters) lives in the SDK at +``agenta.sdk.agents``; this package is the thin Agenta integration that feeds it resolved +tools, vault secrets, and a trace context. +""" + +from oss.src.agent.app import agent_v0_app, create_agent_app + +__all__ = ["agent_v0_app", "create_agent_app"] diff --git a/services/oss/src/agent/app.py b/services/oss/src/agent/app.py new file mode 100644 index 0000000000..4c21719acd --- /dev/null +++ b/services/oss/src/agent/app.py @@ -0,0 +1,150 @@ +"""Agent workflow app: the ``/invoke`` handler, wired onto the SDK agent runtime. + +Mirrors the chat/completion services: an Agenta app exposing ``/invoke`` and ``/inspect`` +through ``ag.create_app`` + ``ag.workflow`` + ``ag.route``. The handler parses the request +into a neutral ``AgentConfig`` + ``RunSelection`` (``agenta.sdk.agents``), resolves tools +(``tools``) and provider secrets (``secrets``) server-side, threads the trace context +(``tracing``), then runs one turn through a :class:`Harness` over a backend it picks from +the selection, and records the run's usage. + +The sandbox-agent-backed backend is the production path. The transport is a deployment +choice: HTTP to `AGENTA_AGENT_RUNNER_URL`, or a local runner CLI in a source checkout. +The harness, sandbox, and permission policy are editable playground config. +""" + +from typing import Any, Dict, List, Optional + +import agenta as ag + +from agenta.sdk.agents import ( + AgentConfig, + Backend, + Environment, + SandboxAgentBackend, + RunSelection, + SessionConfig, + make_harness, + to_messages, +) +from agenta.sdk.agents.adapters.vercel import agent_run_to_vercel_parts + +from agenta.sdk.agents.platform import resolve_secrets + +from oss.src.agent.config import load_config, runner_dir, runner_url +from oss.src.agent.schemas import AGENT_SCHEMAS +from oss.src.agent.tools import resolve_mcp_servers, resolve_tools +from oss.src.agent.tracing import record_usage, trace_context + + +def _default_agent_config() -> AgentConfig: + """The service's file defaults (AGENTS.md, model, tools) as a neutral AgentConfig.""" + file_cfg = load_config() + return AgentConfig( + instructions=file_cfg.agents_md, + model=file_cfg.model, + tools=file_cfg.tools, + ) + + +def select_backend(selection: RunSelection) -> Backend: + """Pick the backend for a run. + + The service always uses the sandbox-agent-backed runner. `AGENTA_AGENT_RUNNER_URL` + selects HTTP transport in deployed containers. When it is unset, local development + spawns the TypeScript runner CLI from the runner dir. + """ + return SandboxAgentBackend( + sandbox=selection.sandbox, + url=runner_url(), + cwd=str(runner_dir()), + ) + + +async def _agent( + inputs: Optional[Dict[str, Any]] = None, + messages: Optional[List[Any]] = None, + parameters: Optional[Dict] = None, + stream: Optional[bool] = None, + session_id: Optional[str] = None, +): + params = parameters or {} + + agent_config = AgentConfig.from_params(params, defaults=_default_agent_config()) + selection = RunSelection.from_params(params) + + msgs = to_messages(messages or (inputs or {}).get("messages") or []) + # Three independent resolutions (tools, MCP, provider-key secrets), not one aggregate: + # the boundary resolves; the backend later decides how each tool executes. + resolved_tools = await resolve_tools(agent_config.tools) + resolved_mcp = await resolve_mcp_servers(agent_config.mcp_servers) + + session_config = SessionConfig( + agent=agent_config, + secrets=await resolve_secrets(), + permission_policy=selection.permission_policy, + trace=trace_context(), + session_id=session_id, + builtin_names=resolved_tools.builtin_names, + tool_specs=resolved_tools.tool_specs, + tool_callback=resolved_tools.tool_callback, + mcp_servers=resolved_mcp, + ) + + # The harness validates that the chosen backend can drive it. Unsupported combinations + # such as `agenta` on sandbox-agent fail here instead of silently changing runtime behavior. + # setup/cleanup own the backend lifecycle; prompt/stream run one cold turn. + harness = make_harness(selection.harness, Environment(select_backend(selection))) + + # Both paths hand off to a helper that owns the environment lifecycle (setup/cleanup). + # They differ only in shape, as they must: the `/messages` SSE path (`stream` set) returns + # the Vercel UI Message Stream as an async generator the normalizer turns into a streaming + # response; `/invoke` and the `/messages` JSON path return the batch assistant message. + if stream: + return _agent_vercel_stream(harness, session_config, msgs) + return await _agent_batch(harness, session_config, msgs) + + +async def _agent_batch(harness, session_config, msgs): + """Run one batch turn and return the assistant message. Owns the environment lifecycle.""" + await harness.setup() + try: + result = await harness.prompt(session_config, msgs) + finally: + await harness.cleanup() + record_usage(result.usage) + return {"role": "assistant", "content": result.output} + + +async def _agent_vercel_stream(harness, session_config, msgs): + """Run one streaming turn and yield Vercel UI Message Stream parts. + + Owns the environment lifecycle (``setup`` / ``cleanup``); the per-turn session is torn + down by the ``AgentRun``'s own cleanup hook when the stream drains. The ``session_id`` is + stamped onto the stream's ``start`` part by the endpoint, so it is not threaded here. + """ + await harness.setup() + try: + run = await harness.stream(session_config, msgs) + async for part in agent_run_to_vercel_parts(run): + yield part + try: + record_usage(run.result().usage) + except Exception: # result unavailable on a failed/aborted stream + pass + finally: + await harness.cleanup() + + +def create_agent_app(): + app = ag.create_app() + # The builtin agent workflow interface (`agenta:builtin:agent:v0`, `agent_v0_interface` + # in the SDK) now exists, but this service still registers the handler directly, so it + # gets an auto URI (`user:custom:...`) and runs locally. Binding the handler to the + # builtin URI is the remaining step. + routed = ag.workflow(schemas=AGENT_SCHEMAS)(_agent) + # is_agent gates the agent-only `/messages` + `/load-session` routes (next to /invoke). + ag.route("/", app=app, flags={"is_chat": True, "is_agent": True})(routed) + return app + + +agent_v0_app = create_agent_app() diff --git a/services/oss/src/agent/client.py b/services/oss/src/agent/client.py new file mode 100644 index 0000000000..59ec7969b4 --- /dev/null +++ b/services/oss/src/agent/client.py @@ -0,0 +1,63 @@ +"""Access to the Agenta backend from inside a harness run. + +Resolving the backend base URL and the caller-scoped credential is shared by the tool +resolver and the secret resolver, so it lives here. The credential reuses the same +propagation the OTLP export rides on, so an agent run calls ``/tools/resolve``, +``/tools/call``, and ``/secrets/`` as the caller, not with broader rights. +""" + +import os +from typing import Optional + +import agenta as ag +from agenta.sdk.engines.tracing.propagation import inject + +# Budget for a backend round-trip (the tool catalog/connection check, the vault fetch). +TOOLS_TIMEOUT = float(os.getenv("AGENTA_AGENT_TOOLS_TIMEOUT", "30")) + + +def agenta_api_base() -> Optional[str]: + """Resolve the Agenta backend base URL (``.../api``). + + Prefers an explicit override, then derives it from the OTLP endpoint the SDK is + configured with (``{host}/api/otlp/v1/traces``), then falls back to env. Returns + ``None`` when nothing is configured; callers only need this when tools or secrets apply. + """ + override = os.getenv("AGENTA_AGENT_TOOLS_API_URL") + if override: + return override.rstrip("/") + + try: + otlp_url = ag.tracing.otlp_url + except Exception: # pylint: disable=broad-except + otlp_url = None + if otlp_url and "/otlp/" in otlp_url: + return otlp_url.split("/otlp/", 1)[0].rstrip("/") + + api_url = os.getenv("AGENTA_API_URL") + if api_url: + return api_url.rstrip("/") + + return None + + +def request_authorization() -> Optional[str]: + """The project-scoped credential to call the Agenta backend. + + Reuses the same propagation the OTLP credential rides on (the caller's Authorization), + falling back to the service's own API key the way the tracing sidecar does. Scoping to + the caller keeps an agent run from invoking tools the user could not (WP-7 risk: + RUN_TOOLS scoping). + """ + try: + authorization = inject({}).get("Authorization") + except Exception: # pylint: disable=broad-except + authorization = None + if authorization: + return authorization + + api_key = os.getenv("AGENTA_API_KEY") + if api_key: + return f"ApiKey {api_key}" + + return None diff --git a/services/oss/src/agent/config.py b/services/oss/src/agent/config.py new file mode 100644 index 0000000000..0f5814f6a9 --- /dev/null +++ b/services/oss/src/agent/config.py @@ -0,0 +1,78 @@ +"""Hardcoded MVP agent config, read from ``services/agent/config``. + +The config (AGENTS.md text, model, tools) lives in editable files so changing the +agent does not need a code change. Paths can be overridden with env vars for Docker +or alternate layouts. +""" + +import json +import os +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, List, Optional + +# services/oss/src/agent/config.py -> parents[3] == services/ +_SERVICES_DIR = Path(__file__).resolve().parents[3] +_DEFAULT_AGENT_DIR = _SERVICES_DIR / "agent" + +# Fallback config used when the editable files are missing or a field is absent. +# Kept in sync with the catalog template and the `/inspect` schema defaults +# (schemas.py: _DEFAULT_MODEL / _DEFAULT_AGENTS_MD). +DEFAULT_MODEL = "gpt-5.5" +DEFAULT_AGENTS_MD = ( + "You are a friendly hello-world agent running on the Agenta agent service.\n\n" + "- Greet the user warmly.\n" + "- Answer the user's message in one or two short sentences." +) + + +@dataclass +class AgentConfig: + agents_md: str + model: Optional[str] = None + # Provider-agnostic tool references (WP-7). Each entry is either a plain string + # (a Pi built-in name, normalized to a ``builtin`` ref downstream) or a + # discriminated dict (``{"type": "composio", ...}``). Resolution happens in the + # backend at invoke time; the service just forwards the list. + tools: List[Any] = field(default_factory=list) + + +def runner_dir() -> Path: + """Directory of the TypeScript agent runner (where the CLI command runs).""" + override = os.getenv("AGENTA_AGENT_RUNNER_DIR") + return Path(override) if override else _DEFAULT_AGENT_DIR + + +def runner_url() -> Optional[str]: + """HTTP URL for the deployed agent runner service, when configured.""" + value = os.getenv("AGENTA_AGENT_RUNNER_URL") + return value.strip() if value and value.strip() else None + + +def config_dir() -> Path: + """Directory holding AGENTS.md and agent.json.""" + override = os.getenv("AGENTA_AGENT_CONFIG_DIR") + return Path(override) if override else (_DEFAULT_AGENT_DIR / "config") + + +def load_config() -> AgentConfig: + base = config_dir() + + # Read the editable AGENTS.md when present; otherwise fall back to the default + # instructions so a fresh checkout (or Docker layout) still runs. + agents_md = DEFAULT_AGENTS_MD + agents_path = base / "AGENTS.md" + if agents_path.exists(): + text = agents_path.read_text(encoding="utf-8").strip() + if text: + agents_md = text + + model: str = DEFAULT_MODEL + tools: List[str] = [] + meta_path = base / "agent.json" + if meta_path.exists(): + meta = json.loads(meta_path.read_text(encoding="utf-8")) + model = meta.get("model") or DEFAULT_MODEL + tools = meta.get("tools", []) or [] + + return AgentConfig(agents_md=agents_md, model=model, tools=tools) diff --git a/services/oss/src/agent/schemas.py b/services/oss/src/agent/schemas.py new file mode 100644 index 0000000000..7047734a01 --- /dev/null +++ b/services/oss/src/agent/schemas.py @@ -0,0 +1,82 @@ +"""JSON schemas the agent workflow advertises via ``/inspect``. + +The agent self-describes its interface here instead of registering a static SDK +interface. The shape mirrors the chat workflow (messages in, a single assistant +message out) so the playground renders a chat box and POSTs `data.inputs.messages`. + +Kept in its own module so it composes into the workflow registration with a one-line +change and stays out of the handler logic. +""" + +_SCHEMA = "https://json-schema.org/draft/2020-12/schema" + +# Default config the playground pre-fills and the agent falls back to. Kept in sync +# with the catalog template and ``config.py`` (DEFAULT_MODEL / DEFAULT_AGENTS_MD). +_DEFAULT_MODEL = "gpt-5.5" +_DEFAULT_AGENTS_MD = ( + "You are a friendly hello-world agent running on the Agenta agent service.\n\n" + "- Greet the user warmly.\n" + "- Answer the user's message in one or two short sentences." +) + +# Inputs: a chat-style message list. `x-ag-type-ref: messages` is what marks the +# workflow as chat to the playground (same marker the builtin chat service uses). +AGENT_INPUTS_SCHEMA = { + "$schema": _SCHEMA, + "type": "object", + "additionalProperties": True, + "properties": { + "messages": { + "x-ag-type-ref": "messages", + "type": "array", + "description": "Ordered list of normalized chat messages.", + }, + }, +} + +# The agent config element: one composite control the playground renders for the whole +# agent config, instead of reusing `prompt-template` plus loose params. The field shape is +# the `agent_config` catalog type (AgentConfigSchema in agenta.sdk.utils.types), so this is a +# thin `x-ag-type-ref` the playground resolves against `/workflows/catalog/types/agent_config` +# and dispatches to the AgentConfigControl (web/packages/agenta-entity-ui/.../AgentConfigControl.tsx). +# The catalog type keeps the typed tools/mcp_servers shape in one place; this schema only +# carries the default that the playground pre-fills. The agent handler reads it from +# `parameters.agent` in app.py. +_DEFAULT_AGENT_CONFIG = { + "agents_md": _DEFAULT_AGENTS_MD, + "model": _DEFAULT_MODEL, + "tools": [], + "mcp_servers": [], + "harness": "pi", + "sandbox": "local", + "permission_policy": "auto", +} + +AGENT_CONFIG_SCHEMA = { + "type": "object", + "x-ag-type-ref": "agent_config", + "title": "Agent", + "description": "The agent's instructions, model, tools, MCP servers, and runtime.", + "default": _DEFAULT_AGENT_CONFIG, +} + +AGENT_PARAMETERS_SCHEMA = { + "$schema": _SCHEMA, + "type": "object", + "additionalProperties": True, + "properties": {"agent": AGENT_CONFIG_SCHEMA}, +} + +# Outputs: the final assistant message. +AGENT_OUTPUTS_SCHEMA = { + "$schema": _SCHEMA, + "x-ag-type-ref": "message", + "type": "object", + "description": "Final assistant message returned by the agent.", +} + +AGENT_SCHEMAS = { + "inputs": AGENT_INPUTS_SCHEMA, + "parameters": AGENT_PARAMETERS_SCHEMA, + "outputs": AGENT_OUTPUTS_SCHEMA, +} diff --git a/services/oss/src/agent/secrets.py b/services/oss/src/agent/secrets.py new file mode 100644 index 0000000000..3a7e89e374 --- /dev/null +++ b/services/oss/src/agent/secrets.py @@ -0,0 +1,12 @@ +"""Harness provider-key resolution: now lives in the SDK platform package. + +Kept as a thin re-export so existing service imports keep working. ``resolve_harness_secrets`` +is the prior name for the SDK's ``resolve_provider_keys``. +""" + +from agenta.sdk.agents.platform.secrets import ( + _PROVIDER_ENV_VARS, + resolve_provider_keys as resolve_harness_secrets, +) + +__all__ = ["resolve_harness_secrets", "_PROVIDER_ENV_VARS"] diff --git a/services/oss/src/agent/tools/__init__.py b/services/oss/src/agent/tools/__init__.py new file mode 100644 index 0000000000..59a58d88e2 --- /dev/null +++ b/services/oss/src/agent/tools/__init__.py @@ -0,0 +1,14 @@ +"""Agent-service composition and adapters for tool resolution.""" + +from .gateway import AgentaGatewayToolResolver, _to_gateway_reference +from .resolver import resolve_mcp_servers, resolve_tools +from .secrets import VaultToolSecretProvider + +_gateway_ref = _to_gateway_reference + +__all__ = [ + "AgentaGatewayToolResolver", + "VaultToolSecretProvider", + "resolve_tools", + "resolve_mcp_servers", +] diff --git a/services/oss/src/agent/tools/gateway.py b/services/oss/src/agent/tools/gateway.py new file mode 100644 index 0000000000..c9b7de483c --- /dev/null +++ b/services/oss/src/agent/tools/gateway.py @@ -0,0 +1,19 @@ +"""Gateway tool resolver: now lives in the SDK platform package. + +Kept as a thin re-export so existing service imports +(``from oss.src.agent.tools import AgentaGatewayToolResolver``) keep working. The +implementation moved to ``agenta.sdk.agents.platform.gateway`` so a standalone SDK user with +a local backend resolves gateway tools the same way the service does. +""" + +from agenta.sdk.agents.platform.gateway import ( + AgentaGatewayToolResolver, + _normalize_reference, + _to_gateway_reference, +) + +__all__ = [ + "AgentaGatewayToolResolver", + "_to_gateway_reference", + "_normalize_reference", +] diff --git a/services/oss/src/agent/tools/resolver.py b/services/oss/src/agent/tools/resolver.py new file mode 100644 index 0000000000..8db19022b8 --- /dev/null +++ b/services/oss/src/agent/tools/resolver.py @@ -0,0 +1,37 @@ +"""Service-side resolution wiring. + +The three resolution entrypoints now live in the SDK (``agenta.sdk.agents.platform``) so the +service and a standalone SDK user share them. ``resolve_tools`` is re-exported as-is; the +service only adds the MCP deployment gate (``AGENTA_AGENT_ENABLE_MCP``, off by default) on top +of the SDK's ``resolve_mcp``. +""" + +from __future__ import annotations + +import os +from typing import Any, List, Optional, Sequence + +from agenta.sdk.agents.mcp import ResolvedMCPServer +from agenta.sdk.agents.platform import resolve_mcp, resolve_tools +from agenta.sdk.agents.tools.interfaces import ToolSecretProvider +from agenta.sdk.utils.constants import TRUTHY + +__all__ = ["resolve_tools", "resolve_mcp_servers"] + + +def _mcp_enabled() -> bool: + return os.getenv("AGENTA_AGENT_ENABLE_MCP", "").strip().lower() in TRUTHY + + +async def resolve_mcp_servers( + mcp_servers: Sequence[Any], + *, + secret_provider: Optional[ToolSecretProvider] = None, +) -> List[ResolvedMCPServer]: + """Resolve MCP servers, gated by ``AGENTA_AGENT_ENABLE_MCP`` (off by default). + + Returns the resolved servers when enabled, an empty list when not. + """ + if not _mcp_enabled(): + return [] + return await resolve_mcp(mcp_servers, secret_provider=secret_provider) diff --git a/services/oss/src/agent/tools/secrets.py b/services/oss/src/agent/tools/secrets.py new file mode 100644 index 0000000000..addf2ae605 --- /dev/null +++ b/services/oss/src/agent/tools/secrets.py @@ -0,0 +1,12 @@ +"""Named-secret provider: now lives in the SDK platform package. + +Kept as a thin re-export so existing service imports keep working. ``VaultToolSecretProvider`` +is the prior name for the SDK's ``AgentaNamedSecretProvider``. +""" + +from agenta.sdk.agents.platform.secrets import ( + AgentaNamedSecretProvider as VaultToolSecretProvider, + resolve_named_secrets, +) + +__all__ = ["VaultToolSecretProvider", "resolve_named_secrets"] diff --git a/services/oss/src/agent/tracing.py b/services/oss/src/agent/tracing.py new file mode 100644 index 0000000000..7069381a35 --- /dev/null +++ b/services/oss/src/agent/tracing.py @@ -0,0 +1,85 @@ +"""OpenTelemetry glue: thread the workflow trace into the run, record the run's usage. + +The handler runs inside the instrumented ``/invoke`` span, so threading its trace context +into the harness makes the agent's spans children of that span (same trace), and stamping +the run's token/cost totals onto it shows the run's usage even though the harness exports +its span tree in a separate OTLP batch. +""" + +import os +from typing import Any, Dict, Optional + +from opentelemetry import trace as otel_trace + +import agenta as ag +from agenta.sdk.engines.tracing.propagation import inject +from agenta.sdk.utils.logging import get_module_logger + +from agenta.sdk.agents import TraceContext + +log = get_module_logger(__name__) + +_CAPTURE_CONTENT = os.getenv("AGENTA_AGENT_CAPTURE_CONTENT", "true").lower() not in ( + "0", + "false", + "no", +) + + +def trace_context() -> Optional[TraceContext]: + """Capture the active workflow span's trace context for the harness. + + Threading the ``/invoke`` span's ``traceparent`` into the run makes the agent's spans + children of that span, so the whole run shows up under the response's ``trace_id`` the + way completion/chat nest their LLM spans. Best-effort: any failure returns ``None`` and + the run is traced standalone (or not at all) using the runner's env config. + """ + try: + headers = inject({}) + + traceparent = headers.get("traceparent") + if not traceparent: + return None + + endpoint = None + try: + endpoint = ag.tracing.otlp_url + except Exception: # pylint: disable=broad-except + endpoint = None + + return TraceContext( + traceparent=traceparent, + baggage=headers.get("baggage"), + endpoint=endpoint, + authorization=headers.get("Authorization"), + capture_content=_CAPTURE_CONTENT, + ) + except Exception: # pylint: disable=broad-except + log.warning("agent: failed to capture trace context", exc_info=True) + return None + + +def record_usage(usage: Optional[Dict[str, Any]]) -> None: + """Stamp the agent's token/cost totals onto the active ``/invoke`` workflow span. + + The harness emits its own span tree (turns, LLM, tools) in a separate OTLP batch, so + Agenta's per-batch cumulative roll-up cannot bridge the totals onto the workflow span. + Setting ``gen_ai.usage.*`` here records them directly on that span (the root of its + batch), so the trace shows the run's tokens and cost. Best-effort. + """ + if not usage or not usage.get("total"): + return + try: + span = otel_trace.get_current_span() + input_tokens = int(usage.get("input") or 0) + output_tokens = int(usage.get("output") or 0) + span.set_attribute("gen_ai.usage.input_tokens", input_tokens) + span.set_attribute("gen_ai.usage.output_tokens", output_tokens) + span.set_attribute("gen_ai.usage.prompt_tokens", input_tokens) + span.set_attribute("gen_ai.usage.completion_tokens", output_tokens) + span.set_attribute("gen_ai.usage.total_tokens", int(usage.get("total") or 0)) + cost = usage.get("cost") + if cost: + span.set_attribute("gen_ai.usage.cost", float(cost)) + except Exception: # pylint: disable=broad-except + log.warning("agent: failed to record usage on workflow span", exc_info=True) diff --git a/services/oss/tests/pytest/integration/__init__.py b/services/oss/tests/pytest/integration/__init__.py new file mode 100644 index 0000000000..a78ea06af0 --- /dev/null +++ b/services/oss/tests/pytest/integration/__init__.py @@ -0,0 +1 @@ +# Integration tests package. diff --git a/services/oss/tests/pytest/integration/agent/__init__.py b/services/oss/tests/pytest/integration/agent/__init__.py new file mode 100644 index 0000000000..da89fd87e0 --- /dev/null +++ b/services/oss/tests/pytest/integration/agent/__init__.py @@ -0,0 +1 @@ +# Integration tests for the agent workflow service (httpx boundary mocked, no live backend). diff --git a/services/oss/tests/pytest/integration/agent/conftest.py b/services/oss/tests/pytest/integration/agent/conftest.py new file mode 100644 index 0000000000..dfc223343c --- /dev/null +++ b/services/oss/tests/pytest/integration/agent/conftest.py @@ -0,0 +1,76 @@ +"""Integration fixtures: a fake httpx client for the tool/secret resolvers. + +These tests wire the real resolver code against a mocked HTTP boundary (no live backend, no +respx/pytest-httpx dependency). ``install_http`` patches, in a given resolver module, the two +``client`` helpers (``agenta_api_base`` / ``request_authorization``) plus ``httpx.AsyncClient``, +and returns a ``capture`` dict the test can assert the outgoing request against. +""" + +from __future__ import annotations + +import json +from typing import Any, Dict, Optional + +import pytest + + +class _FakeResponse: + def __init__(self, status_code: int, payload: Any, text: Optional[str]) -> None: + self.status_code = status_code + self._payload = payload if payload is not None else {} + self.text = text if text is not None else json.dumps(self._payload) + + def json(self) -> Any: + return self._payload + + +def _fake_async_client(*, response, raises, capture: Dict[str, Any]): + class _Client: + def __init__(self, *args, **kwargs) -> None: + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + return False + + async def post(self, url, json=None, headers=None): + capture.update(method="POST", url=url, json=json, headers=headers) + if raises: + raise raises + return response + + async def get(self, url, headers=None): + capture.update(method="GET", url=url, headers=headers) + if raises: + raise raises + return response + + return _Client + + +@pytest.fixture +def install_http(monkeypatch): + def _install( + module, + *, + status: int = 200, + payload: Any = None, + text: Optional[str] = None, + raises: Optional[BaseException] = None, + api_base: Optional[str] = "https://api.x/api", + authorization: Optional[str] = "Access tok", + ) -> Dict[str, Any]: + capture: Dict[str, Any] = {} + monkeypatch.setattr(module, "agenta_api_base", lambda: api_base) + monkeypatch.setattr(module, "request_authorization", lambda: authorization) + response = _FakeResponse(status, payload, text) + monkeypatch.setattr( + module.httpx, + "AsyncClient", + _fake_async_client(response=response, raises=raises, capture=capture), + ) + return capture + + return _install diff --git a/services/oss/tests/pytest/integration/agent/test_resolve_secrets_http.py b/services/oss/tests/pytest/integration/agent/test_resolve_secrets_http.py new file mode 100644 index 0000000000..8eb4f45a01 --- /dev/null +++ b/services/oss/tests/pytest/integration/agent/test_resolve_secrets_http.py @@ -0,0 +1,64 @@ +"""``resolve_harness_secrets`` against a mocked ``GET /secrets/``. + +Best-effort by design: it maps only ``provider_key`` vault entries to env vars, dedupes by +env var, and returns ``{}`` on any error rather than failing the run. +""" + +from __future__ import annotations + +import pytest + +from oss.src.agent import secrets +from oss.src.agent.secrets import resolve_harness_secrets + +pytestmark = pytest.mark.integration + + +async def test_no_api_base_returns_empty(install_http): + install_http(secrets, api_base=None) + assert await resolve_harness_secrets() == {} + + +async def test_maps_only_provider_keys_with_dedupe(install_http): + install_http( + secrets, + status=200, + payload=[ + { + "kind": "provider_key", + "data": {"kind": "openai", "provider": {"key": "sk-1"}}, + }, + # duplicate env var -> first one wins (setdefault). + { + "kind": "provider_key", + "data": {"kind": "openai", "provider": {"key": "sk-2"}}, + }, + { + "kind": "provider_key", + "data": {"kind": "anthropic", "provider": {"key": "sk-ant"}}, + }, + # not a provider key -> ignored. + {"kind": "other", "data": {"kind": "openai", "provider": {"key": "x"}}}, + # unmapped provider -> ignored. + { + "kind": "provider_key", + "data": {"kind": "made_up", "provider": {"key": "y"}}, + }, + # missing key -> ignored. + {"kind": "provider_key", "data": {"kind": "groq", "provider": {}}}, + ], + ) + + env = await resolve_harness_secrets() + + assert env == {"OPENAI_API_KEY": "sk-1", "ANTHROPIC_API_KEY": "sk-ant"} + + +async def test_http_error_returns_empty(install_http): + install_http(secrets, status=400) + assert await resolve_harness_secrets() == {} + + +async def test_network_exception_returns_empty(install_http): + install_http(secrets, raises=RuntimeError("network down")) + assert await resolve_harness_secrets() == {} diff --git a/services/oss/tests/pytest/integration/agent/tools/__init__.py b/services/oss/tests/pytest/integration/agent/tools/__init__.py new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/services/oss/tests/pytest/integration/agent/tools/__init__.py @@ -0,0 +1 @@ + diff --git a/services/oss/tests/pytest/integration/agent/tools/test_gateway_http.py b/services/oss/tests/pytest/integration/agent/tools/test_gateway_http.py new file mode 100644 index 0000000000..1664e8fcfe --- /dev/null +++ b/services/oss/tests/pytest/integration/agent/tools/test_gateway_http.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import httpx +import pytest + +from agenta.sdk.agents import ( + GatewayToolResolutionError, + ToolCallback, +) + +from oss.src.agent.tools import resolve_tools +from oss.src.agent.tools import gateway + +pytestmark = pytest.mark.integration + +_GATEWAY = { + "type": "gateway", + "provider": "composio", + "integration": "github", + "action": "GET_USER", + "connection": "c1", +} + + +async def test_no_gateway_short_circuits_without_http(install_http): + capture = install_http(gateway, raises=AssertionError("must not call HTTP")) + resolved = await resolve_tools(["read"]) + assert resolved.builtin_names == ["read"] + assert capture == {} + + +async def test_missing_api_base_raises_typed_error(install_http): + install_http(gateway, api_base=None) + with pytest.raises(GatewayToolResolutionError, match="API base URL"): + await resolve_tools([_GATEWAY]) + + +async def test_gateway_metadata_and_description_fallback_are_preserved(install_http): + capture = install_http( + gateway, + payload={ + "custom": [ + { + "name": "get_user", + "description": None, + "input_schema": {"type": "object"}, + "call_ref": "tools.composio.github.GET_USER.c1", + } + ] + }, + ) + resolved = await resolve_tools( + [ + { + **_GATEWAY, + "needs_approval": True, + "render": {"kind": "component", "component": "User"}, + } + ] + ) + spec = resolved.tool_specs[0] + assert spec.description == "get_user" + assert spec.needs_approval is True + assert spec.render == {"kind": "component", "component": "User"} + assert spec.to_wire()["needsApproval"] is True + assert isinstance(resolved.tool_callback, ToolCallback) + assert capture["json"]["tools"][0]["type"] == "gateway" + + +async def test_gateway_specs_are_joined_by_call_ref_not_position(install_http): + install_http( + gateway, + payload={ + "custom": [ + { + "name": "second", + "description": "Second", + "input_schema": {}, + "call_ref": "tools.composio.github.SECOND.c2", + }, + { + "name": "first", + "description": "First", + "input_schema": {}, + "call_ref": "tools.composio.github.FIRST.c1", + }, + ] + }, + ) + resolved = await resolve_tools( + [ + { + **_GATEWAY, + "action": "FIRST", + "connection": "c1", + "needs_approval": True, + }, + { + **_GATEWAY, + "action": "SECOND", + "connection": "c2", + "render": {"kind": "component", "component": "Second"}, + }, + ] + ) + first, second = resolved.tool_specs + assert first.name == "first" + assert first.needs_approval is True + assert first.render is None + assert second.name == "second" + assert second.needs_approval is False + assert second.render == {"kind": "component", "component": "Second"} + + +async def test_transport_failure_is_logged_and_normalized( + install_http, + monkeypatch, +): + warnings = [] + monkeypatch.setattr( + gateway, + "log", + type( + "Log", + (), + {"warning": lambda self, *args, **kwargs: warnings.append(args)}, + )(), + ) + request = httpx.Request("POST", "https://api.x/api/tools/resolve") + install_http(gateway, raises=httpx.ConnectError("offline", request=request)) + with pytest.raises(GatewayToolResolutionError) as caught: + await resolve_tools([_GATEWAY]) + assert isinstance(caught.value.__cause__, httpx.ConnectError) + assert warnings + assert "gateway tool resolution request failed" in warnings[0][0].lower() + + +@pytest.mark.parametrize( + ("payload", "message"), + [ + ({"custom": []}, "expected one per ref"), + ( + { + "custom": [ + { + "name": "get_user", + "description": "x", + "input_schema": {}, + } + ] + }, + "incomplete spec", + ), + ], +) +async def test_invalid_gateway_response_fails_fast( + install_http, + payload, + message, +): + install_http(gateway, payload=payload) + with pytest.raises(GatewayToolResolutionError, match=message): + await resolve_tools([_GATEWAY]) + + +async def test_http_status_failure_is_typed(install_http): + install_http(gateway, status=400, text="bad request") + with pytest.raises(GatewayToolResolutionError) as caught: + await resolve_tools([_GATEWAY]) + assert caught.value.status == 400 diff --git a/services/oss/tests/pytest/integration/agent/tools/test_secrets_http.py b/services/oss/tests/pytest/integration/agent/tools/test_secrets_http.py new file mode 100644 index 0000000000..2aea5678fb --- /dev/null +++ b/services/oss/tests/pytest/integration/agent/tools/test_secrets_http.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import pytest + +from oss.src.agent.tools import secrets + +pytestmark = pytest.mark.integration + + +async def test_named_secrets_are_resolved_by_tools_adapter(install_http): + capture = install_http( + secrets, + payload={"secrets": {"TOKEN": "value", "EMPTY": None}}, + ) + + resolved = await secrets.resolve_named_secrets(["TOKEN", "EMPTY", "MISSING"]) + + assert resolved == {"TOKEN": "value"} + assert capture == { + "method": "POST", + "url": "https://api.x/api/secrets/resolve", + "json": {"names": ["TOKEN", "EMPTY", "MISSING"]}, + "headers": { + "Content-Type": "application/json", + "Authorization": "Access tok", + }, + } + + +async def test_named_secrets_without_api_base_return_empty(install_http): + install_http(secrets, api_base=None) + + assert await secrets.resolve_named_secrets(["TOKEN"]) == {} + + +async def test_named_secret_http_failure_returns_empty(install_http): + install_http(secrets, status=500) + + assert await secrets.resolve_named_secrets(["TOKEN"]) == {} diff --git a/services/oss/tests/pytest/unit/__init__.py b/services/oss/tests/pytest/unit/__init__.py new file mode 100644 index 0000000000..1a351fabba --- /dev/null +++ b/services/oss/tests/pytest/unit/__init__.py @@ -0,0 +1 @@ +# Unit tests package. diff --git a/services/oss/tests/pytest/unit/agent/__init__.py b/services/oss/tests/pytest/unit/agent/__init__.py new file mode 100644 index 0000000000..5da5a3fd3b --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/__init__.py @@ -0,0 +1 @@ +# Unit tests for the agent workflow service (oss.src.agent). diff --git a/services/oss/tests/pytest/unit/agent/conftest.py b/services/oss/tests/pytest/unit/agent/conftest.py new file mode 100644 index 0000000000..f84c7b29df --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/conftest.py @@ -0,0 +1,112 @@ +"""Fakes for the agent service unit tests. + +A local, minimal ``FakeBackend`` (≈ the SDK's) so the ``/invoke`` handler can run end-to-end +in-process with no runner, no LLM, and no network. It implements the real ``Backend`` / +``Sandbox`` / ``Session`` ports, so the port contract keeps it honest across the two suites. + +This conftest is scoped to ``unit/agent/`` so the handler tests do not pull the acceptance +suite's account / live-API fixtures from the services root conftest. +""" + +from __future__ import annotations + +from typing import Dict, Mapping, Optional, Sequence + +import pytest + +from agenta.sdk.agents import AgentResult, HarnessType +from agenta.sdk.agents.interfaces import Backend, Sandbox, Session +from agenta.sdk.agents.streaming import AgentRun + + +class _FakeSandbox(Sandbox): + def __init__(self) -> None: + self.files: Dict[str, bytes] = {} + self.destroyed = False + + async def add_files(self, files: Mapping[str, bytes]) -> None: + self.files.update(files) + + async def destroy(self) -> None: + self.destroyed = True + + +class _FakeSession(Session): + def __init__(self, result: AgentResult) -> None: + self._result = result + self.destroyed = False + + @property + def id(self) -> Optional[str]: + return self._result.session_id + + async def prompt(self, messages, *, on_event=None) -> AgentResult: + return self._result + + def stream(self, messages) -> AgentRun: + result = self._result + + async def _records(): + yield { + "kind": "result", + "result": {"ok": True, "output": result.output}, + } + + return AgentRun(_records()) + + async def destroy(self) -> None: + self.destroyed = True + + +class FakeBackend(Backend): + """Echoes a fixed result, regardless of harness. Records lifecycle for assertions. + + Crucially it also records the *harness-shaped* config each ``create_session`` receives + (the ``PiAgentConfig`` / ``ClaudeAgentConfig`` / ``AgentaAgentConfig`` the harness + produced). This is the backend boundary where per-harness translation surfaces, so a + handler test can assert the response body is identical across harnesses *and* that the + translated configs diverge as designed (Pi keeps built-ins and forces auto; Claude drops + built-ins and honors the policy; Agenta unions forced tools and carries skills). + """ + + def __init__( + self, + *, + result: Optional[AgentResult] = None, + supported: Sequence[HarnessType] = ( + HarnessType.PI, + HarnessType.CLAUDE, + HarnessType.AGENTA, + ), + ) -> None: + self.supported_harnesses = frozenset(supported) + self._result = result if result is not None else AgentResult(output="echo") + self.setup_calls = 0 + self.shutdown_calls = 0 + # Every harness-shaped config that reached the backend boundary, in call order. + self.created_configs: list = [] + self.created_session_ids: list[Optional[str]] = [] + + async def setup(self) -> None: + self.setup_calls += 1 + + async def shutdown(self) -> None: + self.shutdown_calls += 1 + + async def create_sandbox(self) -> _FakeSandbox: + return _FakeSandbox() + + async def create_session( + self, sandbox, config, *, harness, secrets=None, trace=None, session_id=None + ) -> _FakeSession: + self.created_configs.append(config) + self.created_session_ids.append(session_id) + return _FakeSession(self._result) + + +@pytest.fixture +def fake_backend(): + def _make(**kwargs) -> FakeBackend: + return FakeBackend(**kwargs) + + return _make diff --git a/services/oss/tests/pytest/unit/agent/test_invoke_handler.py b/services/oss/tests/pytest/unit/agent/test_invoke_handler.py new file mode 100644 index 0000000000..bba91f7ebe --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/test_invoke_handler.py @@ -0,0 +1,205 @@ +"""The ``/invoke`` handler (`_agent`) end-to-end in-process. + +Runs the real parse -> resolve -> harness -> record path with a ``FakeBackend`` and the +network-touching helpers stubbed. No runner, no LLM, no HTTP. This is where the cross-harness +"byte-identical response body" guarantee is locked at the Python layer. +""" + +from __future__ import annotations + +import pytest + +from agenta.sdk.agents import ( + AgentConfig, + AgentResult, + GatewayToolResolutionError, + ResolvedToolSet, +) +from agenta.sdk.agents.adapters.agenta_builtins import AGENTA_FORCED_SKILLS + +from oss.src.agent import app + + +def _patch_handler(monkeypatch, backend, *, builtins=(), tool_callback=None): + """Stub the network-touching helpers and pin one ``backend`` for the run. + + ``builtins`` are the resolved built-in tool names ``resolve_tools`` hands back, so a turn + can carry a real tool list and the per-harness translation has something to diverge on. + Returns the ``recorded`` dict the usage hook writes into. + """ + recorded = {} + + async def _tools(tools, **_kw): + return ResolvedToolSet( + builtin_names=list(builtins), + tool_callback=tool_callback, + ) + + async def _no_mcp(mcp_servers, **_kw): + return [] + + async def _no_secrets(): + return {} + + monkeypatch.setattr(app, "resolve_tools", _tools) + monkeypatch.setattr(app, "resolve_mcp_servers", _no_mcp) + monkeypatch.setattr(app, "resolve_secrets", _no_secrets) + monkeypatch.setattr(app, "trace_context", lambda: None) + monkeypatch.setattr( + app, "record_usage", lambda usage: recorded.__setitem__("usage", usage) + ) + monkeypatch.setattr(app, "select_backend", lambda selection: backend) + monkeypatch.setattr( + app, "_default_agent_config", lambda: AgentConfig(instructions="x", model="m") + ) + return recorded + + +@pytest.fixture +def patched(monkeypatch, fake_backend): + backend = fake_backend(result=AgentResult(output="echo", usage={"total": 15})) + recorded = _patch_handler(monkeypatch, backend) + return backend, recorded + + +async def _invoke(harness="pi", **agent): + return await app._agent( + messages=[{"role": "user", "content": "hi"}], + parameters={"agent": {"harness": harness, **agent}}, + ) + + +async def test_invoke_returns_assistant_message(patched): + assert await _invoke("pi") == {"role": "assistant", "content": "echo"} + + +async def test_invoke_records_usage(patched): + _, recorded = patched + await _invoke("pi") + assert recorded["usage"] == {"total": 15} + + +async def test_invoke_runs_backend_lifecycle(patched): + backend, _ = patched + await _invoke("pi") + assert backend.setup_calls == 1 + assert backend.shutdown_calls == 1 # cleanup() tears the backend down + + +async def test_messages_session_id_reaches_session_config(patched): + backend, _ = patched + + await app._agent( + messages=[{"role": "user", "content": "hi"}], + parameters={"agent": {"harness": "pi"}}, + session_id="sess_request", + ) + + assert backend.created_session_ids == ["sess_request"] + + +async def test_invoke_cross_harness_same_body_divergent_configs( + monkeypatch, fake_backend +): + """The real cross-harness guarantee, exercised through the handler — not stubbed. + + The earlier version of this test pinned a single echoing backend and asserted + ``pi == agenta == claude`` on the echoed constant. That passes no matter how badly the + per-harness translation diverges, because the translation never ran. Here the same turn + runs as pi / agenta / claude against a backend that records the *harness-shaped* config it + receives, so we can assert two distinct things: + + 1. the response body is byte-identical across the three harnesses (the response-layer + guarantee), and + 2. the config that reached the backend boundary diverged exactly as designed — proving + the handler actually drove ``PiHarness`` / ``ClaudeHarness`` / ``AgentaHarness``, + each producing its own config. + + The turn carries a built-in tool (``web_search``) and a ``deny`` policy so the divergence + is observable: Claude drops Pi built-ins and honors the policy; Pi keeps them and forces + ``auto``; Agenta unions the forced tools and ships skills. + """ + backend = fake_backend(result=AgentResult(output="echo", usage={"total": 15})) + _patch_handler(monkeypatch, backend, builtins=["web_search"]) + + bodies = [ + await _invoke(harness, permission_policy="deny") + for harness in ("pi", "agenta", "claude") + ] + pi_body, agenta_body, claude_body = bodies + + # (1) Response-layer guarantee: identical body regardless of harness. + assert ( + pi_body + == agenta_body + == claude_body + == { + "role": "assistant", + "content": "echo", + } + ) + + # (2) The three harness-shaped configs that reached the backend boundary, in call order. + assert len(backend.created_configs) == 3 + pi_cfg, agenta_cfg, claude_cfg = backend.created_configs + pi_wire = pi_cfg.wire_tools() + agenta_wire = agenta_cfg.wire_tools() + claude_wire = claude_cfg.wire_tools() + + # Pi keeps its built-in tool natively and never gates tool use (policy forced to auto, + # the author's `deny` notwithstanding). + assert pi_wire["tools"] == ["web_search"] + assert pi_wire["permissionPolicy"] == "auto" + assert "skills" not in pi_wire + + # Claude has no Pi built-ins (the `web_search` name is dropped) and honors the policy. + assert claude_wire["tools"] == [] + assert claude_wire["permissionPolicy"] == "deny" + assert "skills" not in claude_wire + + # Agenta is Pi-with-an-opinion: it unions the forced tools onto the author's set, forces + # auto like Pi, and ships the forced skills. + assert agenta_wire["tools"] == ["web_search", "read", "bash"] + assert agenta_wire["permissionPolicy"] == "auto" + assert agenta_wire["skills"] == list(AGENTA_FORCED_SKILLS) + + # The configs genuinely differ at the boundary; the body's sameness is not a tautology. + assert pi_wire != claude_wire + assert agenta_wire != pi_wire + + +async def test_stream_tool_resolution_failure_is_raised_before_backend_setup( + monkeypatch, +): + async def _failure(tools, **_kw): + raise GatewayToolResolutionError("gateway unavailable") + + monkeypatch.setattr(app, "resolve_tools", _failure) + monkeypatch.setattr( + app, + "_default_agent_config", + lambda: AgentConfig( + tools=[ + { + "type": "gateway", + "integration": "github", + "action": "GET_USER", + "connection": "c1", + } + ] + ), + ) + monkeypatch.setattr( + app, + "select_backend", + lambda _selection: (_ for _ in ()).throw( + AssertionError("backend must not be selected") + ), + ) + + with pytest.raises(GatewayToolResolutionError, match="gateway unavailable"): + await app._agent( + messages=[{"role": "user", "content": "hi"}], + parameters={"agent": {"harness": "pi"}}, + stream=True, + ) diff --git a/services/oss/tests/pytest/unit/agent/test_secrets_mapping.py b/services/oss/tests/pytest/unit/agent/test_secrets_mapping.py new file mode 100644 index 0000000000..54104e3bb0 --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/test_secrets_mapping.py @@ -0,0 +1,24 @@ +"""Provider-key -> harness env-var mapping. + +The harness authenticates with the project's vault provider keys, injected as the env vars +each provider's SDK reads. If a name here drifts from what the harness expects, auth fails +silently and the run falls back to login/OAuth, so the table is worth a guard. +""" + +from __future__ import annotations + +from oss.src.agent.secrets import _PROVIDER_ENV_VARS + + +def test_standard_providers_map_to_expected_env_vars(): + assert _PROVIDER_ENV_VARS["openai"] == "OPENAI_API_KEY" + assert _PROVIDER_ENV_VARS["anthropic"] == "ANTHROPIC_API_KEY" + assert _PROVIDER_ENV_VARS["gemini"] == "GEMINI_API_KEY" + assert _PROVIDER_ENV_VARS["groq"] == "GROQ_API_KEY" + assert _PROVIDER_ENV_VARS["together_ai"] == "TOGETHERAI_API_KEY" + assert _PROVIDER_ENV_VARS["openrouter"] == "OPENROUTER_API_KEY" + + +def test_both_mistral_spellings_share_one_env_var(): + assert _PROVIDER_ENV_VARS["mistral"] == "MISTRAL_API_KEY" + assert _PROVIDER_ENV_VARS["mistralai"] == "MISTRAL_API_KEY" diff --git a/services/oss/tests/pytest/unit/agent/test_select_backend.py b/services/oss/tests/pytest/unit/agent/test_select_backend.py new file mode 100644 index 0000000000..b01e95f104 --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/test_select_backend.py @@ -0,0 +1,68 @@ +"""``select_backend``: the service always uses the sandbox-agent runner backend.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from agenta.sdk.agents import ( + AgentRunnerConfigurationError, + SandboxAgentBackend, + RunSelection, +) + +from oss.src.agent.app import select_backend + + +@pytest.fixture +def runner_wrapper(tmp_path: Path) -> Path: + cli = tmp_path / "src" / "cli.ts" + cli.parent.mkdir() + cli.write_text("console.log('runner')\n", encoding="utf-8") + return tmp_path + + +@pytest.fixture(autouse=True) +def _clean_env(monkeypatch, runner_wrapper: Path): + # Start every case from a known-empty deployment environment. + monkeypatch.delenv("AGENTA_AGENT_RUNNER_URL", raising=False) + monkeypatch.setenv("AGENTA_AGENT_RUNNER_DIR", str(runner_wrapper)) + + +def _sel(harness="pi", sandbox="local"): + return RunSelection(harness=harness, sandbox=sandbox) + + +@pytest.mark.parametrize("harness", ["pi", "agenta", "claude"]) +def test_all_harnesses_use_sandbox_agent_backend(harness): + assert isinstance(select_backend(_sel(harness, "local")), SandboxAgentBackend) + + +def test_non_local_sandbox_is_threaded_through(): + backend = select_backend(_sel("pi", "daytona")) + + assert isinstance(backend, SandboxAgentBackend) + assert backend._sandbox == "daytona" + + +def test_runner_url_selects_http_transport(monkeypatch): + monkeypatch.setenv("AGENTA_AGENT_RUNNER_URL", "http://sandbox-agent:8765") + + backend = select_backend(_sel("pi", "local")) + + assert backend._url == "http://sandbox-agent:8765" + + +def test_no_runner_url_uses_subprocess_transport(): + # Unset URL means the backend will spawn the runner CLI from a local checkout. + assert select_backend(_sel("pi", "local"))._url is None + + +def test_no_runner_url_requires_runner_assets(monkeypatch, tmp_path: Path): + missing_wrapper = tmp_path / "missing-wrapper" + missing_wrapper.mkdir() + monkeypatch.setenv("AGENTA_AGENT_RUNNER_DIR", str(missing_wrapper)) + + with pytest.raises(AgentRunnerConfigurationError, match="src/cli.ts"): + select_backend(_sel("pi", "local")) diff --git a/services/oss/tests/pytest/unit/agent/tools/__init__.py b/services/oss/tests/pytest/unit/agent/tools/__init__.py new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/tools/__init__.py @@ -0,0 +1 @@ + diff --git a/services/oss/tests/pytest/unit/agent/tools/test_gateway_mapping.py b/services/oss/tests/pytest/unit/agent/tools/test_gateway_mapping.py new file mode 100644 index 0000000000..3ca0690980 --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/tools/test_gateway_mapping.py @@ -0,0 +1,19 @@ +from agenta.sdk.agents import GatewayToolConfig + +from oss.src.agent.tools import _to_gateway_reference + + +def test_gateway_reference_uses_canonical_sdk_shape(): + assert _to_gateway_reference( + GatewayToolConfig( + integration="github", + action="GET_USER", + connection="c1", + ) + ) == { + "type": "gateway", + "provider": "composio", + "integration": "github", + "action": "GET_USER", + "connection": "c1", + } diff --git a/services/oss/tests/pytest/unit/agent/tools/test_resolution.py b/services/oss/tests/pytest/unit/agent/tools/test_resolution.py new file mode 100644 index 0000000000..f87fba6c77 --- /dev/null +++ b/services/oss/tests/pytest/unit/agent/tools/test_resolution.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +from typing import Mapping, Sequence + +import pytest + +from agenta.sdk.agents import ( + CodeToolSpec, + MissingMCPSecretError, + MissingToolSecretError, +) + +from oss.src.agent.tools import resolve_mcp_servers, resolve_tools +from oss.src.agent.tools import resolver as resolver_module + + +class _FakeSecretProvider: + """A `ToolSecretProvider` that serves canned values and records the names requested.""" + + def __init__(self, values: Mapping[str, str]) -> None: + self.values = dict(values) + self.requests: list[list[str]] = [] + + async def get_many(self, names: Sequence[str]) -> Mapping[str, str]: + self.requests.append(list(names)) + return {name: self.values[name] for name in names if name in self.values} + + +async def test_resolve_tools_builds_local_specs_with_scoped_secrets(): + provider = _FakeSecretProvider({"TOKEN": "secret"}) + resolved = await resolve_tools( + [ + "read", + { + "type": "code", + "name": "calc", + "script": "...", + "secrets": ["TOKEN"], + }, + { + "type": "client", + "name": "pick", + }, + ], + secret_provider=provider, + ) + assert provider.requests == [["TOKEN"]] + assert resolved.builtin_names == ["read"] + code = next(spec for spec in resolved.tool_specs if spec.name == "calc") + assert isinstance(code, CodeToolSpec) + assert code.env == {"TOKEN": "secret"} + assert ( + next(spec for spec in resolved.tool_specs if spec.name == "pick").kind + == "client" + ) + + +async def test_missing_tool_secret_is_not_silently_omitted(): + with pytest.raises(MissingToolSecretError): + await resolve_tools( + [ + { + "type": "code", + "name": "calc", + "script": "...", + "secrets": ["TOKEN"], + } + ], + secret_provider=_FakeSecretProvider({}), + ) + + +async def test_mcp_is_disabled_at_service_composition_by_default(monkeypatch): + monkeypatch.delenv("AGENTA_AGENT_ENABLE_MCP", raising=False) + assert await resolve_mcp_servers([{"name": "github", "command": "npx"}]) == [] + + +async def test_missing_mcp_secret_is_explicit_when_enabled(monkeypatch): + monkeypatch.setattr(resolver_module, "_mcp_enabled", lambda: True) + with pytest.raises(MissingMCPSecretError): + await resolve_mcp_servers( + [ + { + "name": "github", + "command": "npx", + "secrets": {"GITHUB_TOKEN": "missing"}, + } + ], + secret_provider=_FakeSecretProvider({}), + )