feat(agent): Pi-backed agent workflow service, template, and tracing#4758
feat(agent): Pi-backed agent workflow service, template, and tracing#4758mmabrouk wants to merge 1 commit into
Conversation
… docs - New agent workflow service wrapping the Pi harness, served same-origin like chat/completion at /services/agent/v0 (Python service + Node Pi sidecar, ports/adapters). - Builtin 'agent' app type + create-app template; config is model + AGENTS.md. - /inspect chat schema and OpenTelemetry tracing into Agenta. - EE dev compose agent-pi sidecar; design docs under docs/design/agent-workflows.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
📝 WalkthroughWalkthroughIntroduces a new "agent" workflow type backed by the pi.dev coding-agent harness. Adds research documentation, a TypeScript Pi wrapper service ( ChangesAgent Workflows Feature
Sequence Diagram(s)sequenceDiagram
rect rgba(70, 130, 180, 0.5)
note over Client, AgentaOTLP: Agent invoke flow
end
participant Client
participant agent_v0_app
participant _agent as _agent (Python)
participant PiHttpHarness
participant server_ts as server.ts (Node)
participant runPi
participant PiSDK
participant AgentaOTLP
Client->>agent_v0_app: POST /agent/v0/invoke
agent_v0_app->>_agent: dispatch
_agent->>_agent: load_config() + _trace_context()
_agent->>PiHttpHarness: invoke(HarnessRequest + traceparent)
PiHttpHarness->>server_ts: POST /run {agentsMd, model, trace}
server_ts->>runPi: runPi(request)
runPi->>runPi: createAgentaOtel(traceparent)
runPi->>PiSDK: createAgentSession(inMemoryConfig)
PiSDK-->>runPi: text_delta events
runPi->>AgentaOTLP: flushTrace → OTLP/HTTP batch (invoke_agent→turn→chat→tool)
runPi-->>server_ts: AgentRunResult {ok, output, traceId}
server_ts-->>PiHttpHarness: JSON response
PiHttpHarness-->>_agent: HarnessResult
_agent-->>Client: assistant message
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| "content-type": "application/json", | ||
| "content-length": Buffer.byteLength(payload), | ||
| }); | ||
| res.end(payload); |
Reviewer guide: interesting codeA map to the hunks that carry the architecture. Start at the first one.
|
|
|
||
| def create_agent_app(): | ||
| app = ag.create_app() | ||
| # No builtin URI yet: registering the agent as a first-class workflow type |
There was a problem hiding this comment.
The decision to scrutinize: the handler registers under an auto user:custom: URI, not the seeded agenta:builtin:agent:v0 builtin type. That keeps this PR off builtin-type resolution, but it leaves the agent schema in two places (schemas.py here and the seeded SDK agent_v0_interface) until WP-6 unifies them. They must stay in sync.
|
|
||
|
|
||
| @dataclass | ||
| class TraceContext: |
There was a problem hiding this comment.
The single seam for cross-boundary tracing: TraceContext.to_wire() maps snake_case Python to the camelCase shape the TS wrapper reads. Both Harness adapters (stdio and HTTP) serialize through this, so a field added here must be mirrored in runPi.ts TraceContext.
| const loader = new DefaultResourceLoader({ | ||
| cwd, | ||
| agentDir: getAgentDir(), | ||
| noContextFiles: true, |
There was a problem hiding this comment.
Diskless config: noContextFiles: true plus the agentsFilesOverride virtual AGENTS.md inject the instructions in memory and keep on-disk context files out of the run. With the throwaway temp cwd, a run touches no persistent disk. Worth confirming no on-disk fallback path slips context back in.
| spans.push(span); | ||
| this.buffers.set(traceId, spans); | ||
| // No parent in this process => this is the local root and the trace is done. | ||
| if (!span.parentSpanId) { |
There was a problem hiding this comment.
Auto-flush fires only when the span has no parent in this process (the local root). When the caller threads a traceparent, the root has a remote parent that never ends here, so onEnd never auto-flushes and the run must call otel.flush(traceId) explicitly. Check the error/timeout path in runPi.ts still reaches that flush, or the trace is dropped.
There was a problem hiding this comment.
Actionable comments posted: 18
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
services/agent/docker/Dockerfile.dev (1)
7-29:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRun the container as a non-root user by default.
No
USERis set, so the image defaults to root; this weakens least-privilege posture for the sidecar.Suggested fix
FROM node:24-slim @@ RUN pnpm install --frozen-lockfile @@ COPY tsconfig.json ./ COPY src ./src + +# Drop root privileges for runtime. +USER node @@ CMD ["node_modules/.bin/tsx", "watch", "src/server.ts"]Source: Linters/SAST tools
🟡 Minor comments (6)
hosting/docker-compose/ee/docker-compose.dev.yml-397-399 (1)
397-399:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winDeclare the new sidecar dependency for startup ordering.
After introducing
AGENTA_AGENT_PI_URL,servicesshould explicitly depend onagent-pito reduce transient boot-time failures for agent requests.Suggested change
services: @@ networks: - agenta-network + depends_on: + agent-pi: + condition: service_starteddocs/design/agent-workflows/README.md-14-16 (1)
14-16:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winBroken internal link: "Open research topics" heading not found.
Line 15 references
[Open research topics](#open-research-topics), but no section with that ID exists in the document. The heading "## POC work packages" (line 95) appears to be the intended target, or a new section titled "## Open research topics" should be added before the POC work packages.As written, readers clicking the link will not navigate to the referenced section.
🔧 Proposed fix
Either update the link to point to the correct section:
-The research topics in [Open research topics](`#open-research-topics`) will be assigned to subagents and +The research topics below will be assigned to subagents andOr, rename the POC work packages section to match the link:
-## POC work packages +## Open research topics (POC work packages)docs/design/agent-workflows/wp-3-daytona-sandbox/poc/run_agent.py-183-184 (1)
183-184:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winHandle missing flag values in
arg()to avoidIndexErrorcrashes.Line 184 assumes every present flag has a value;
--auth/--modelwithout a following token crashes with a traceback.Suggested fix
def arg(name: str, default: str) -> str: - return sys.argv[sys.argv.index(name) + 1] if name in sys.argv else default + if name not in sys.argv: + return default + i = sys.argv.index(name) + 1 + if i >= len(sys.argv) or sys.argv[i].startswith("--"): + raise SystemExit(f"missing value for {name}") + return sys.argv[i]docs/design/agent-workflows/wp-1-pi-tracing/poc/README.md-15-20 (1)
15-20:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAdd language identifiers to fenced code blocks.
markdownlint MD040 flags both fences; add explicit languages (for example,
text) to keep docs lint-clean.Also applies to: 67-73
Source: Linters/SAST tools
docs/design/agent-workflows/wp-1-pi-tracing/tracing-in-the-agent-service.md-25-31 (1)
25-31:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winSpecify languages for fenced code blocks.
markdownlint MD040 is triggered here; add fence languages (for example,
textandbash).Also applies to: 100-102
Source: Linters/SAST tools
docs/design/agent-workflows/wp-1-pi-tracing/poc/run.ts-173-178 (1)
173-178:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winReset
runConfig.traceIdbefore each prompt to avoid stale IDs.The loop can push a previous trace id when a prompt iteration does not produce a fresh one.
🔧 Suggested change
const traceIds: string[] = []; for (let i = 0; i < scenario.prompts.length; i++) { + runConfig.traceId = undefined; const p = scenario.prompts[i]; console.log(`\n[run] prompt ${i + 1}/${scenario.prompts.length}: ${p}\n`); await session.prompt(p); if (runConfig.traceId) traceIds.push(runConfig.traceId); }
🧹 Nitpick comments (2)
docs/design/agent-workflows/wp-1-pi-tracing/integrating-the-tracing-extension.md (1)
23-28: ⚡ Quick winAdd language specifiers to three fenced code blocks.
Three code blocks are missing language identifiers, which triggers markdown linting warnings (MD040):
- Lines 23–28 (span tree structure)
- Lines 147–155 (dependencies)
- Lines 161–163 (curl verification command)
Add language specifiers
-``` +``` invoke_agent openinference.span.kind = AGENT (root, one per user prompt) turn N CHAIN chat <model> LLM model, latency, token usage, finish reason, messages execute_tool <name> TOOL args in, result out -``` +``` -``` +``` `@earendil-works/pi-coding-agent` 0.79.4 `@opentelemetry/api` 1.9.0 `@opentelemetry/exporter-trace-otlp-proto` 0.54.0 @@ -150,7 +152,7 @@ `@opentelemetry/sdk-trace-node` 1.28.0 `@opentelemetry/semantic-conventions` 1.28.0 -``` +``` -``` +``` curl -s "${AGENTA_HOST}/api/spans/?trace_id=<id>" -H "Authorization: ApiKey ${AGENTA_API_KEY}" -``` +```The first block is a diagram (use a code fence without a language or add a comment prefix like
text), the second is a version list (addtextorplaintext), and the third is a bash command (addbash).Also applies to: 147-155, 161-163
services/agent/scripts/register_agent_app.py (1)
44-45: ⚖️ Poor tradeoffSchema definitions must remain in sync with
services/oss/src/agent_pi/schemas.py.The AGENT_SCHEMAS defined here (lines 47–72) are registered as the chat interface for the agent app. The comment on lines 44–45 notes they must be kept in sync with the Python agent service's schema definitions. If these diverge, the playground will render a mismatched schema, breaking the chat UI.
Consider adding a check or docstring reminder in both files to surface any drift early, or extracting the schema to a shared definition (e.g., a JSON/YAML file committed to both locations).
Also applies to: 47-72
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: c2ca2e05-b7c6-4d53-8c90-a69e487dadc4
⛔ Files ignored due to path filters (2)
docs/design/agent-workflows/wp-1-pi-tracing/poc/pnpm-lock.yamlis excluded by!**/pnpm-lock.yamlservices/agent/pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (59)
docs/design/agent-workflows/README.mddocs/design/agent-workflows/research/auth-secrets.mddocs/design/agent-workflows/research/daytona-sandbox.mddocs/design/agent-workflows/research/diskless-in-memory-config.mddocs/design/agent-workflows/research/open-questions.mddocs/design/agent-workflows/research/otel-instrumentation.mddocs/design/agent-workflows/research/pi-interaction.mddocs/design/agent-workflows/research/sandbox-sharing.mddocs/design/agent-workflows/wp-1-pi-tracing/README.mddocs/design/agent-workflows/wp-1-pi-tracing/integrating-the-tracing-extension.mddocs/design/agent-workflows/wp-1-pi-tracing/poc/.env.exampledocs/design/agent-workflows/wp-1-pi-tracing/poc/README.mddocs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.tsdocs/design/agent-workflows/wp-1-pi-tracing/poc/package.jsondocs/design/agent-workflows/wp-1-pi-tracing/poc/run.tsdocs/design/agent-workflows/wp-1-pi-tracing/tracing-in-the-agent-service.mddocs/design/agent-workflows/wp-2-agent-service/README.mddocs/design/agent-workflows/wp-2-agent-service/implementation-plan.mddocs/design/agent-workflows/wp-3-daytona-sandbox/README.mddocs/design/agent-workflows/wp-3-daytona-sandbox/poc/README.mddocs/design/agent-workflows/wp-3-daytona-sandbox/poc/bench_coldstart.pydocs/design/agent-workflows/wp-3-daytona-sandbox/poc/build_snapshot.pydocs/design/agent-workflows/wp-3-daytona-sandbox/poc/cleanup.pydocs/design/agent-workflows/wp-3-daytona-sandbox/poc/run_agent.pydocs/design/agent-workflows/wp-4-multi-message-output/README.mddocs/design/agent-workflows/wp-5-chat-vs-completion/README.mddocs/design/agent-workflows/wp-6-workflow-type-and-template/README.mddocs/design/agent-workflows/wp-7-tools/README.mdhosting/docker-compose/ee/docker-compose.dev.ymlsdks/python/agenta/sdk/engines/running/interfaces.pysdks/python/agenta/sdk/engines/running/utils.pyservices/agent/.dockerignoreservices/agent/README.mdservices/agent/config/AGENTS.mdservices/agent/config/agent.jsonservices/agent/docker-compose.agent.ymlservices/agent/docker-compose.stack.ymlservices/agent/docker/Dockerfile.devservices/agent/package.jsonservices/agent/scripts/register_agent_app.pyservices/agent/src/agenta-otel.tsservices/agent/src/cli.tsservices/agent/src/runPi.tsservices/agent/src/server.tsservices/agent/tsconfig.jsonservices/entrypoints/agent_main.pyservices/entrypoints/main.pyservices/oss/src/agent.pyservices/oss/src/agent_pi/__init__.pyservices/oss/src/agent_pi/config.pyservices/oss/src/agent_pi/local_runtime.pyservices/oss/src/agent_pi/pi_harness.pyservices/oss/src/agent_pi/pi_http_harness.pyservices/oss/src/agent_pi/ports.pyservices/oss/src/agent_pi/schemas.pyweb/oss/src/components/pages/app-management/components/CreateAppDropdown/index.tsxweb/oss/src/components/pages/app-management/modals/CreateAppTypeModal/index.tsxweb/oss/src/components/pages/prompts/assets/iconHelpers.tsxweb/packages/agenta-entities/src/workflow/state/appUtils.ts
| - Caveat: the existing map uses the older `gen_ai.usage.prompt_tokens` / | ||
| `completion_tokens` names. The Pi extensions emit the newer | ||
| `gen_ai.usage.input_tokens` / `output_tokens`. Those newer keys are **not** | ||
| in `semconv.py` yet, so token metrics from Pi would be dropped until we add | ||
| the two aliases. (Verified by reading `semconv.py` — only `prompt_tokens` / | ||
| `completion_tokens` / `total_tokens` are present.) |
There was a problem hiding this comment.
Token attribute mismatch: Pi extensions emit newer semconv keys not yet mapped in Agenta.
Lines 301-306 note that existing semconv.py mappings handle only gen_ai.usage.prompt_tokens|completion_tokens|total_tokens, but Pi extensions emit gen_ai.usage.input_tokens|output_tokens. Relevant code context confirms api/oss/src/apis/fastapi/otlp/opentelemetry/semconv.py lacks mappings for the newer attribute names.
Impact: when Pi extensions export spans over OTLP to Agenta's /otlp/v1/traces, token usage metrics will be silently dropped unless semconv.py is updated to include:
gen_ai.usage.input_tokens->ag.metrics.unit.tokens.promptgen_ai.usage.output_tokens->ag.metrics.unit.tokens.completion
This must be addressed before WP-1 (Pi OTel tracing extension) can be fully integrated.
| AGENTA_HOST=http://144.76.237.122:8280/ | ||
| AGENTA_API_KEY=your-agenta-project-api-key |
There was a problem hiding this comment.
Avoid a real shared collector endpoint in the checked-in example config.
The default AGENTA_HOST points to a concrete external host, which can cause accidental telemetry/content leakage when someone runs the POC with defaults.
🔧 Suggested change
-AGENTA_HOST=http://144.76.237.122:8280/
+AGENTA_HOST=http://localhost:8280/
AGENTA_API_KEY=your-agenta-project-api-key🧰 Tools
🪛 dotenv-linter (4.0.0)
[warning] 3-3: [UnorderedKey] The AGENTA_API_KEY key should go before the AGENTA_HOST key
(UnorderedKey)
| this.exporter.export(orderParentFirst(spans), () => {}); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's verify the file exists and check its content
ls -la docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.tsRepository: Agenta-AI/agenta
Length of output: 173
🏁 Script executed:
# Get the file size to determine how to read it
wc -l docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.tsRepository: Agenta-AI/agenta
Length of output: 127
🏁 Script executed:
# Read the specific sections mentioned in the review (lines 60-80 and 365-400)
cat -n docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.ts | sed -n '60,80p'Repository: Agenta-AI/agenta
Length of output: 900
🏁 Script executed:
# Also check the forceFlush method around lines 73-75
cat -n docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.ts | sed -n '65,85p'Repository: Agenta-AI/agenta
Length of output: 957
🏁 Script executed:
# Check the second location mentioned (73-75 and the forceFlush context)
cat -n docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.ts | sed -n '360,400p'Repository: Agenta-AI/agenta
Length of output: 1964
🏁 Script executed:
# Check if ExportResultCode is available in the suggested import location
rg "ExportResultCode" --type ts --type tsxRepository: Agenta-AI/agenta
Length of output: 88
🏁 Script executed:
# Check what OpenTelemetry imports are already in the file
head -30 docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.tsRepository: Agenta-AI/agenta
Length of output: 1562
🏁 Script executed:
# Check more of the imports and look for ExportResultCode usage patterns
cat -n docs/design/agent-workflows/wp-1-pi-tracing/poc/agenta-otel.ts | head -80Repository: Agenta-AI/agenta
Length of output: 3999
🏁 Script executed:
# Check if ExportResultCode is used anywhere in the codebase
rg "ExportResultCode" -t tsRepository: Agenta-AI/agenta
Length of output: 404
🏁 Script executed:
# Check OpenTelemetry packages in package.json to understand available versions
find . -name "package.json" | xargs grep -l "`@opentelemetry/core`" | head -3Repository: Agenta-AI/agenta
Length of output: 109
Do not swallow OTLP export failures.
Both export callbacks at line 66 and lines 73–75 ignore the result, and forceFlush() resolves even when export fails. This masks telemetry loss as success.
🔧 Suggested change
+import { ExportResultCode } from "`@opentelemetry/core`";
@@
- this.exporter.export(orderParentFirst(spans), () => {});
+ this.exporter.export(orderParentFirst(spans), (result) => {
+ if (result.code !== ExportResultCode.SUCCESS) {
+ console.warn("[agenta-otel] span export failed", result.error);
+ }
+ });
@@
- return new Promise((resolve) =>
- this.exporter.export(orderParentFirst(leftovers), () => resolve()),
- );
+ return new Promise((resolve, reject) =>
+ this.exporter.export(orderParentFirst(leftovers), (result) => {
+ if (result.code === ExportResultCode.SUCCESS) resolve();
+ else reject(result.error ?? new Error("OTLP export failed"));
+ }),
+ );| pi.on("tool_execution_start", async (event: any) => { | ||
| const parent = currentTurn?.ctx ?? agentCtx ?? context.active(); | ||
| const name = event?.toolName ? `execute_tool ${event.toolName}` : "execute_tool"; | ||
| const span = t.startSpan(name, undefined, parent); | ||
| span.setAttribute("openinference.span.kind", "TOOL"); | ||
| span.setAttribute("gen_ai.operation.name", "execute_tool"); | ||
| if (event?.toolName) span.setAttribute("gen_ai.tool.name", event.toolName); | ||
| if (event?.toolCallId) span.setAttribute("gen_ai.tool.call.id", event.toolCallId); | ||
| setInputs(span, (event?.args as Record<string, unknown>) ?? {}); | ||
| if (event?.toolCallId) toolSpans.set(event.toolCallId, span); | ||
| }); | ||
|
|
||
| pi.on("tool_execution_end", async (event: any) => { | ||
| const span = event?.toolCallId ? toolSpans.get(event.toolCallId) : undefined; | ||
| if (!span) return; | ||
| setOutput(span, toolResultText(event?.result)); | ||
| if (event?.isError) span.setStatus({ code: SpanStatusCode.ERROR }); | ||
| span.end(); | ||
| toolSpans.delete(event.toolCallId); | ||
| }); |
There was a problem hiding this comment.
Close in-flight tool spans on turn end as a safety net.
llmSpan has a fallback close in turn_end, but tool spans do not. If a tool_execution_end event is missed, tool spans remain open and are never exported.
🔧 Suggested change
pi.on("turn_end", async (event: any) => {
@@
if (currentTurn) {
currentTurn.span.end();
currentTurn = undefined;
}
+ for (const span of toolSpans.values()) {
+ span.setStatus({
+ code: SpanStatusCode.ERROR,
+ message: "tool_execution_end not received before turn_end",
+ });
+ span.end();
+ }
+ toolSpans.clear();
});Also applies to: 383-395
| await sandbox.process.get_session_command_logs_async( | ||
| session_id, | ||
| cmd_id, | ||
| collector.feed_stdout, | ||
| collector.feed_stderr, | ||
| ) |
There was a problem hiding this comment.
Add a timeout guard to live log streaming so teardown always progresses.
Line 276 awaits streaming logs with no upper bound; if the session stalls, the coroutine can hang forever and never reach sandbox deletion.
Suggested fix
+ stream_timeout_s = int(os.environ.get("PI_STREAM_TIMEOUT_S", "900"))
await sandbox.process.get_session_command_logs_async(
- session_id,
- cmd_id,
- collector.feed_stdout,
- collector.feed_stderr,
- )
+ session_id,
+ cmd_id,
+ collector.feed_stdout,
+ collector.feed_stderr,
+ )
+ try:
+ await asyncio.wait_for(
+ sandbox.process.get_session_command_logs_async(
+ session_id,
+ cmd_id,
+ collector.feed_stdout,
+ collector.feed_stderr,
+ ),
+ timeout=stream_timeout_s,
+ )
+ except asyncio.TimeoutError:
+ collector.error = f"stream timeout after {stream_timeout_s}s"
+ log(f"\n [error] {collector.error}")
+ raise📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await sandbox.process.get_session_command_logs_async( | |
| session_id, | |
| cmd_id, | |
| collector.feed_stdout, | |
| collector.feed_stderr, | |
| ) | |
| stream_timeout_s = int(os.environ.get("PI_STREAM_TIMEOUT_S", "900")) | |
| try: | |
| await asyncio.wait_for( | |
| sandbox.process.get_session_command_logs_async( | |
| session_id, | |
| cmd_id, | |
| collector.feed_stdout, | |
| collector.feed_stderr, | |
| ), | |
| timeout=stream_timeout_s, | |
| ) | |
| except asyncio.TimeoutError: | |
| collector.error = f"stream timeout after {stream_timeout_s}s" | |
| log(f"\n [error] {collector.error}") | |
| raise |
| async function readBody(req: IncomingMessage): Promise<string> { | ||
| const chunks: Buffer[] = []; | ||
| for await (const chunk of req) { | ||
| chunks.push(chunk as Buffer); | ||
| } | ||
| return Buffer.concat(chunks).toString("utf8"); | ||
| } |
There was a problem hiding this comment.
Cap /run request body size to prevent memory exhaustion.
readBody() buffers the full payload without a limit, so a large request can exhaust memory.
Suggested fix
-async function readBody(req: IncomingMessage): Promise<string> {
+async function readBody(req: IncomingMessage, maxBytes = 1_000_000): Promise<string> {
const chunks: Buffer[] = [];
+ let total = 0;
for await (const chunk of req) {
- chunks.push(chunk as Buffer);
+ const buf = chunk as Buffer;
+ total += buf.length;
+ if (total > maxBytes) {
+ throw new Error("Request body too large");
+ }
+ chunks.push(buf);
}
return Buffer.concat(chunks).toString("utf8");
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async function readBody(req: IncomingMessage): Promise<string> { | |
| const chunks: Buffer[] = []; | |
| for await (const chunk of req) { | |
| chunks.push(chunk as Buffer); | |
| } | |
| return Buffer.concat(chunks).toString("utf8"); | |
| } | |
| async function readBody(req: IncomingMessage, maxBytes = 1_000_000): Promise<string> { | |
| const chunks: Buffer[] = []; | |
| let total = 0; | |
| for await (const chunk of req) { | |
| const buf = chunk as Buffer; | |
| total += buf.length; | |
| if (total > maxBytes) { | |
| throw new Error("Request body too large"); | |
| } | |
| chunks.push(buf); | |
| } | |
| return Buffer.concat(chunks).toString("utf8"); | |
| } |
| } catch (err) { | ||
| const message = err instanceof Error ? err.stack ?? err.message : String(err); | ||
| return send(res, 500, { ok: false, error: message }); |
There was a problem hiding this comment.
Do not return stack traces to clients.
The 500 response currently includes err.stack, which exposes internal paths/implementation details over the API.
Suggested fix
} catch (err) {
- const message = err instanceof Error ? err.stack ?? err.message : String(err);
- return send(res, 500, { ok: false, error: message });
+ const message = err instanceof Error ? err.stack ?? err.message : String(err);
+ process.stderr.write(`[pi-wrapper] server error: ${message}\n`);
+ return send(res, 500, { ok: false, error: "Internal server error" });
}
});📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| } catch (err) { | |
| const message = err instanceof Error ? err.stack ?? err.message : String(err); | |
| return send(res, 500, { ok: false, error: message }); | |
| } catch (err) { | |
| const message = err instanceof Error ? err.stack ?? err.message : String(err); | |
| process.stderr.write(`[pi-wrapper] server error: ${message}\n`); | |
| return send(res, 500, { ok: false, error: "Internal server error" }); | |
| } |
Source: Linters/SAST tools
| if meta_path.exists(): | ||
| meta = json.loads(meta_path.read_text(encoding="utf-8")) | ||
| model = meta.get("model") or DEFAULT_MODEL | ||
| tools = meta.get("tools", []) or [] |
There was a problem hiding this comment.
Handle invalid agent.json defensively to prevent request-time failures.
At Line 64, malformed JSON (or non-object JSON) will raise and break every call path that loads config; Line 65/Line 66 also assume the parsed shape matches dict/List[str]. Validate parse + types before constructing AgentConfig.
Suggested patch
def load_config() -> AgentConfig:
@@
model: str = DEFAULT_MODEL
tools: List[str] = []
meta_path = base / "agent.json"
if meta_path.exists():
- meta = json.loads(meta_path.read_text(encoding="utf-8"))
- model = meta.get("model") or DEFAULT_MODEL
- tools = meta.get("tools", []) or []
+ try:
+ parsed = json.loads(meta_path.read_text(encoding="utf-8"))
+ except json.JSONDecodeError:
+ parsed = {}
+
+ meta = parsed if isinstance(parsed, dict) else {}
+ raw_model = meta.get("model")
+ if isinstance(raw_model, str) and raw_model:
+ model = raw_model
+
+ raw_tools = meta.get("tools", [])
+ if isinstance(raw_tools, list):
+ tools = [tool for tool in raw_tools if isinstance(tool, str)]
return AgentConfig(agents_md=agents_md, model=model, tools=tools)| proc = await asyncio.create_subprocess_exec( | ||
| *command, | ||
| cwd=cwd, | ||
| env=env, | ||
| stdin=asyncio.subprocess.PIPE, | ||
| stdout=asyncio.subprocess.PIPE, | ||
| stderr=asyncio.subprocess.PIPE, | ||
| ) | ||
|
|
||
| try: | ||
| stdout, stderr = await asyncio.wait_for( | ||
| proc.communicate(input=input_bytes), | ||
| timeout=timeout, | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| proc.kill() | ||
| await proc.wait() | ||
| raise RuntimeError( | ||
| f"Harness process timed out after {timeout}s: {' '.join(command)}" | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, examine the actual file to see the current implementation
cat -n services/oss/src/agent_pi/local_runtime.py | head -80Repository: Agenta-AI/agenta
Length of output: 2187
🏁 Script executed:
# Check the full imports and context
head -30 services/oss/src/agent_pi/local_runtime.pyRepository: Agenta-AI/agenta
Length of output: 803
🏁 Script executed:
# Look for the exec method and timeout handling
sed -n '20,60p' services/oss/src/agent_pi/local_runtime.pyRepository: Agenta-AI/agenta
Length of output: 1251
🏁 Script executed:
# Check if ruff is available and run format/lint checks on the file
ruff check services/oss/src/agent_pi/local_runtime.py --select=E,W,F 2>&1 || trueRepository: Agenta-AI/agenta
Length of output: 90
🏁 Script executed:
# Check if there are any other subprocess/timeout handling patterns in the codebase
rg "create_subprocess_exec|proc.kill|TimeoutError" --type py -B 2 -A 2Repository: Agenta-AI/agenta
Length of output: 5625
🏁 Script executed:
# Check if there are any Windows/platform compatibility checks in the codebase
rg "start_new_session|killpg|signal.SIGKILL|platform|sys.platform" --type py services/oss/src/agent_pi/ -A 2 -B 2Repository: Agenta-AI/agenta
Length of output: 42
🏁 Script executed:
# Check if this file needs to support Windows or is Unix-only
rg "Windows|win32|platform|os.name" --type py services/oss/src/agent_pi/ -A 2Repository: Agenta-AI/agenta
Length of output: 42
🌐 Web query:
asyncio subprocess start_new_session Windows compatibility cross-platform
💡 Result:
The start_new_session parameter is not supported on Windows, as it is a concept specific to POSIX systems (Linux, macOS) for managing process sessions and groups [1][2]. Attempting to use start_new_session=True on Windows will typically raise an AttributeError or TypeError because the underlying subprocess module does not support this keyword argument on that platform [1]. To achieve process isolation or detachment on Windows in a cross-platform manner, you should use platform-specific conditional logic [1]: 1. On POSIX systems, you can use start_new_session=True [1][2]. 2. On Windows, you can achieve similar isolation by using the creationflags argument with the constant subprocess.CREATE_NEW_PROCESS_GROUP or subprocess.DETACHED_PROCESS [3][1]. Example of cross-platform implementation: import asyncio import sys import subprocess async def run_process(cmd): kwargs = {} if sys.platform == "win32": # Windows-specific flags for process isolation kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP else: # POSIX-specific flag kwargs['start_new_session'] = True process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, kwargs) return await process.communicate Additionally, note that on Windows, asyncio subprocesses require the ProactorEventLoop, which has been the default since Python 3.8 [4][5][6]. If you are using an older version or a custom loop, ensure this event loop is active, or you may encounter NotImplementedError when attempting to spawn subprocesses [4][5].
Citations:
- 1: https://runebook.dev/en/docs/python/library/subprocess/subprocess.CREATE_NEW_PROCESS_GROUP
- 2: https://runebook.dev/en/docs/python/library/asyncio-subprocess/asyncio.subprocess.Process.terminate
- 3: https://stackoverflow.com/questions/45769985/asyncio-create-subprocess-exec-console-window-opening-for-each-call
- 4: https://docs.python.org/3/library/asyncio-subprocess.html
- 5: https://stackoverflow.com/questions/70349876/asyncio-create-subprocess-exec-cant-run-in-windows-batch-script
- 6: https://docs.python.org/release/3.11.1/library/asyncio-subprocess.html
🌐 Web query:
Python asyncio kill subprocess process group portable cross-platform alternative to killpg
💡 Result:
There is no single built-in, cross-platform method in asyncio to kill a process group, as process group concepts are inherently platform-specific [1][2]. The standard approach to achieving this requires different strategies for POSIX (Linux/macOS) and Windows [3][2][4]. For cross-platform compatibility, you must conditionally implement group-based signaling depending on the operating system [4]. POSIX (Linux/macOS) On POSIX systems, you can terminate a process and its children by using process groups [1][2]. 1. Start the subprocess with start_new_session=True to make the process a group leader [2][5]. 2. Use os.killpg(pgid, signal) to send a signal to the entire process group, where pgid is the process ID of the group leader (the subprocess PID) [1][2]. Example: import os import signal import asyncio async def run_and_kill_group: # Start as session leader proc = await asyncio.create_subprocess_shell("your_command", start_new_session=True) # Terminate the group os.killpg(os.getpgid(proc.pid), signal.SIGTERM) await proc.wait Windows On Windows, process groups are managed via creation flags rather than POSIX signals [3][4]. 1. When starting the subprocess, include creationflags=subprocess.CREATE_NEW_PROCESS_GROUP [4][6]. 2. To terminate, you can send a control event to the process group, such as signal.CTRL_BREAK_EVENT, or use standard termination methods if they have been configured to affect the group [3][4]. Example: import subprocess import asyncio import signal async def run_and_kill_windows: # Use creationflags for group support proc = await asyncio.create_subprocess_shell( "your_command", creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) # Send CTRL_BREAK_EVENT to the process group proc.send_signal(signal.CTRL_BREAK_EVENT) await proc.wait Key Considerations - Robustness: Always verify if the process is still running before sending signals to avoid ProcessLookupError, as asyncio may reap processes asynchronously [7][8]. - Portability Wrapper: Create a wrapper class that detects sys.platform and chooses between os.killpg and proc.send_signal(signal.CTRL_BREAK_EVENT) [4]. - Alternatives: If absolute cross-platform consistency is required without manual platform checks, third-party libraries like psutil are often used to recursively traverse and terminate process trees, although they are not part of the standard library [1].
Citations:
- 1: https://github.com/ctcjab/killpg-demo
- 2: https://runebook.dev/en/docs/python/library/asyncio-subprocess/asyncio.subprocess.Process.terminate
- 3: https://docs.python.org/3/library/asyncio-subprocess.html
- 4: fedora-python/cpython@7015e13
- 5: https://stackoverflow.com/questions/42257512/difference-between-subprocess-popen-preexec-fn-and-start-new-session-in-python
- 6: https://docs.python.org/release/3.11.5/library/subprocess.html
- 7: https://stackoverflow.com/questions/64342460/calling-terminate-on-asyncio-subprocess-raises-processlookuperror
- 8: https://bugs.python.org/issue44153
Timeout cleanup only kills the parent process, not the whole subprocess tree.
At Line 49, proc.kill() targets only the wrapper PID; if it spawns children (e.g., Node spawning other processes), they can outlive the request and accumulate over time, causing resource leaks.
The suggested approach using start_new_session=True and os.killpg() is POSIX-only and will raise errors on Windows. A cross-platform fix requires conditional logic:
- POSIX (Linux/macOS):
start_new_session=True+os.killpg(proc.pid, signal.SIGTERM) - Windows:
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP+proc.send_signal(signal.CTRL_BREAK_EVENT)
This should be wrapped in a platform check using sys.platform to ensure compatibility across deployment environments.
| result = await self._runtime.exec( | ||
| self._command, | ||
| payload, | ||
| cwd=self._wrapper_dir, | ||
| env={**os.environ}, | ||
| timeout=self._timeout, | ||
| ) | ||
|
|
||
| if not result.stdout.strip(): | ||
| raise RuntimeError( | ||
| "Pi wrapper returned no output. " | ||
| f"exit={result.code} stderr={result.stderr[-2000:]}" | ||
| ) | ||
|
|
||
| try: | ||
| data = json.loads(result.stdout) | ||
| except json.JSONDecodeError as exc: | ||
| raise RuntimeError( | ||
| "Pi wrapper returned invalid JSON. " | ||
| f"stdout={result.stdout[:500]} stderr={result.stderr[-1000:]}" | ||
| ) from exc | ||
|
|
||
| if not data.get("ok"): | ||
| raise RuntimeError(f"Pi run failed: {data.get('error')}") | ||
|
|
There was a problem hiding this comment.
Non-zero wrapper exit codes can be treated as successful runs.
At Line 55 onward, success is decided only from parsed JSON. Add an explicit result.code != 0 failure gate so crashed/erroring wrapper processes cannot return false success.
Suggested fix
result = await self._runtime.exec(
self._command,
payload,
cwd=self._wrapper_dir,
env={**os.environ},
timeout=self._timeout,
)
+ if result.code != 0:
+ raise RuntimeError(
+ "Pi wrapper exited with non-zero status. "
+ f"exit={result.code} stderr={result.stderr[-2000:]}"
+ )
+
if not result.stdout.strip():
raise RuntimeError(
"Pi wrapper returned no output. "
f"exit={result.code} stderr={result.stderr[-2000:]}"
)|
Superseded. Replacing the path-based stack with PRs sliced by functional area showing final code only, so reviewers don't comment on intermediate scaffolding that a later PR rewrites. See the new set. |
This PR is part of a stack. Review bottom-up.
Each PR's diff is only its own delta. Merge from the bottom. This PR's base is
main."## Context\n\nThis PR adds a Pi-backed agent that runs as a normal Agenta workflow, alongside chat and completion. A request hits
/invoke, the handler runs one cold agent turn through a TypeScript Pi wrapper, and the final assistant message comes back. It is the bottom of the agent-workflows stack (basemain) and seeds the slices that follow indocs/design/agent-workflows/pr-stack.md: the protocol shell (#2), runner streaming (#3), and the agent template (#4).\n\n## What this changes\n\nThe agent registers like any other workflow.create_agent_app()callsag.create_app()+ag.workflow(schemas=...)+ag.route(), so the backend and playground treat it as a workflow with/invokeand/inspect. The handler reads config (model + AGENTS.md) fromparameters, falls back to file config, builds the user turn, and runs it through aHarnessport.\n\nThe service stays harness-agnostic and environment-agnostic behind two ports inservices/oss/src/agent_pi/ports.py:\n\n-Harnessis the seam to the agent engine.PiHarnessdrives the TS wrapper over JSON-on-stdio.PiHttpHarnesscalls the same wrapper as an HTTP sidecar. Same port, two transports.\n-Runtimeis the seam to the run environment.LocalRuntimeruns the wrapper as a subprocess. A Daytona adapter slots in later behind the same port.\n\n_build_harness()picks the transport from the environment:AGENTA_AGENT_PI_URLset means the HTTP sidecar (the Python container has no Node), unset means a local subprocess.\n\nThe TS wrapper (services/agent/src/runPi.ts) injects AGENTS.md in memory, runs an in-memory session in a throwaway temp dir, and writes one JSON result to stdout. Nothing invocation-specific touches a persistent disk.\n\nTracing threads the caller's trace context across the Python/Node boundary. The handler captures the active/invokespan'straceparentand passes it down. The wrapper startsinvoke_agentas a child of that remote span, so the agent's whole run nests under the response trace the way chat and completion nest their LLM spans.\n\nThe SDK seeds the builtin interface and catalog template foragenta:builtin:agent:v0(model + agents_md, messages-in, message-out), and flips the agent registry entry from evaluator-only to application:\n\n\n- (\"builtin\", \"agent\"): (False, True, False) # evaluator-only\n+ (\"builtin\", \"agent\"): (True, False, False) # application\n\n\nThe running handler does not register under that builtin URI yet. It takes an autouser:custom:...URI. Wiring the handler to the builtin type is deferred (the comment inagent.pypoints at WP-6). The frontend adds "Agent" to the create-app menus, a robot icon, and treats the agent as a chat-mode app (is_chat: type === \"chat\" || type === \"agent\").\n\nMost of the line count is design docs and POC artifacts underdocs/design/agent-workflows/. The running code is small.\n\n## Key architectural decision to review\n\n1. Register throughag.route, not as a builtin workflow type (yet).services/oss/src/agent.py. The agent ships as auser:custom:workflow now, while the SDK seeds theagenta:builtin:agent:v0interface for later. The win is that this PR delivers a working agent without touching builtin-type resolution; the cost is two sources of truth for the agent schema (the liveschemas.pyand the seeded SDK interface) that must stay in sync until WP-6 unifies them. Check that the seeded SDK interface andschemas.pyagree on defaults and shape.\n\n2. Cross-boundary trace nesting.services/oss/src/agent_pi/ports.py(TraceContext) andservices/agent/src/agenta-otel.ts(TraceBatchProcessor). The handler ships itstraceparent, OTLP endpoint, andAuthorizationinto the wrapper, which makesinvoke_agenta child of the remote span. Because that root parent never ends inside the Node process, root-span-end cannot trigger the export. So the wrapper buffers a trace's spans and flushes them by trace id after the run. Look at whether a failed or timed-out run still flushes (or intentionally drops the trace), since the export hangs off the run's happy path inrunPi.ts.\n\n## How to review this PR\n\n1. Openservices/oss/src/agent.pyfirst. It is the whole control flow in one file: build the turn, pick the harness, invoke, return one assistant message. Read_build_harness,_trace_context, andcreate_agent_app.\n2. Readservices/oss/src/agent_pi/ports.pyto see the two ports andTraceContext. The adapters (pi_harness.py,pi_http_harness.py,local_runtime.py) are thin once the ports are clear.\n3. Readservices/agent/src/runPi.tsfor the Pi run: in-memory AGENTS.md, temp cwd, stream accumulation, flush-by-trace-id at the end.\n4. Skimservices/agent/src/agenta-otel.tsonly forTraceBatchProcessor(buffer + flush). The span-attribute code below it is ported unchanged from the WP-1 POC.\n5. Skip thedocs/design/agent-workflows/**docs and POC files unless you want the background; they do not run in production.\n6. Most likely regression: the dual schema source. Ifschemas.pydefaults drift from the seeded SDKagent_v0_interface, the playground pre-fill and the runtime fallback disagree. The other thing to watch is the trace flush on the error path.\n\n## Tests / notes\n\n/invokewas verified live with curl through the standaloneentrypoints/agent_main.pyrunner (auth bypassed for the curl path), and the agent run was confirmed to nest under the/invokespan in the same trace. The wrapper needs Pi auth (~/.pi/agent/auth.json) orOPENAI_API_KEY/ANTHROPIC_API_KEY. The model default isgpt-5.5. Tools, streaming, multi-message output, and the Daytona runtime are later work packages in this stack.\n"