Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,32 @@ export type ResultMessageHandlerResult = {
};
};

export type AgentErrorClassification =
| "upstream_stream_terminated"
| "upstream_connection_error"
| "agent_error";

/**
* Classify an error string surfaced by the Claude CLI via `is_error: true`
* result messages. Transient upstream-stream terminations (e.g. the fetch body
* from the LLM gateway is torn down mid-stream) are retriable; most other
* errors are not.
*/
export function classifyAgentError(
result: string | undefined,
): AgentErrorClassification {
if (!result) return "agent_error";
const text = result.trim();
// Anthropic SDK surfaces an undici fetch abort as "API Error: terminated".
if (/API Error:\s*terminated\b/i.test(text)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you love a little bit of regex error message matching

return "upstream_stream_terminated";
}
if (/API Error:\s*Connection error\b/i.test(text)) {
return "upstream_connection_error";
}
return "agent_error";
}

export function handleResultMessage(
message: SDKResultMessage,
): ResultMessageHandlerResult {
Expand All @@ -626,9 +652,13 @@ export function handleResultMessage(
return { shouldStop: true, stopReason: "max_tokens", usage };
}
if (message.is_error) {
const classification = classifyAgentError(message.result);
return {
shouldStop: true,
error: RequestError.internalError(undefined, message.result),
error: RequestError.internalError(
{ classification, result: message.result },
message.result,
),
usage,
};
}
Expand Down
55 changes: 52 additions & 3 deletions packages/agent/src/server/agent-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import {
createAcpConnection,
type InProcessAcpConnection,
} from "../adapters/acp-connection";
import {
type AgentErrorClassification,
classifyAgentError,
} from "../adapters/claude/conversion/sdk-to-acp";
import { selectRecentTurns } from "../adapters/claude/session/jsonl-hydration";
import type { PermissionMode } from "../execution-mode";
import { DEFAULT_CODEX_MODEL } from "../gateway-models";
Expand Down Expand Up @@ -949,6 +953,50 @@ export class AgentServer {
await this.sendInitialTaskMessage(payload, preTaskRun);
}

private extractErrorClassification(error: unknown): {
classification: AgentErrorClassification;
message: string;
} {
const message =
error instanceof Error ? error.message : String(error ?? "");

// Prefer the structured `data` carried on RequestError if present.
const data = (error as { data?: unknown } | undefined)?.data;
if (
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like zod would simplify it

data &&
typeof data === "object" &&
"classification" in data &&
typeof (data as { classification: unknown }).classification === "string"
) {
return {
classification: (data as { classification: AgentErrorClassification })
.classification,
message,
};
}

return { classification: classifyAgentError(message), message };
}

private classifyAndSignalFailure(
payload: JwtPayload,
phase: "initial" | "resume",
error: unknown,
): Promise<void> {
const { classification, message } = this.extractErrorClassification(error);
const errorMessage =
classification === "upstream_stream_terminated"
? "Upstream LLM stream terminated"
: classification === "upstream_connection_error"
? "Upstream LLM connection error"
: message || "Agent error";
this.logger.error(`send_${phase}_task_message_failed`, {
classification,
message,
});
return this.signalTaskComplete(payload, "error", errorMessage);
}

private async sendInitialTaskMessage(
payload: JwtPayload,
prefetchedRun?: TaskRun | null,
Expand Down Expand Up @@ -1061,7 +1109,7 @@ export class AgentServer {
if (this.session) {
await this.session.logWriter.flushAll();
}
await this.signalTaskComplete(payload, "error");
await this.classifyAndSignalFailure(payload, "initial", error);
}
}

Expand Down Expand Up @@ -1150,7 +1198,7 @@ export class AgentServer {
if (this.session) {
await this.session.logWriter.flushAll();
}
await this.signalTaskComplete(payload, "error");
await this.classifyAndSignalFailure(payload, "resume", error);
}
}

Expand Down Expand Up @@ -1461,6 +1509,7 @@ ${attributionInstructions}
private async signalTaskComplete(
payload: JwtPayload,
stopReason: string,
errorMessage?: string,
): Promise<void> {
if (this.session?.payload.run_id === payload.run_id) {
try {
Expand Down Expand Up @@ -1488,7 +1537,7 @@ ${attributionInstructions}
try {
await this.posthogAPI.updateTaskRun(payload.task_id, payload.run_id, {
status,
error_message: stopReason === "error" ? "Agent error" : undefined,
error_message: errorMessage ?? "Agent error",
});
this.logger.info("Task completion signaled", { status, stopReason });
} catch (error) {
Expand Down
124 changes: 124 additions & 0 deletions packages/agent/src/server/question-relay.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { type SetupServerApi, setupServer } from "msw/node";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { classifyAgentError } from "../adapters/claude/conversion/sdk-to-acp";
import type { PostHogAPIClient } from "../posthog-api";
import { createTestRepo, type TestRepo } from "../test/fixtures/api";
import { createPostHogHandlers } from "../test/mocks/msw-handlers";
Expand Down Expand Up @@ -49,7 +50,42 @@ const QUESTION_META = {
],
};

function createTransientPromptError(): Error & {
data: { classification: string; result: string };
} {
const error = new Error("API Error: terminated") as Error & {
data: { classification: string; result: string };
};
error.data = {
classification: "upstream_stream_terminated",
result: "API Error: terminated",
};
return error;
}

function createTransientConnectionError(): Error & {
data: { classification: string; result: string };
} {
const error = new Error("fetch failed") as Error & {
data: { classification: string; result: string };
};
error.data = {
classification: "upstream_connection_error",
result: "fetch failed",
};
return error;
}

describe("Question relay", () => {
it.each([
["API Error: terminated", "upstream_stream_terminated"],
["API Error: Connection error", "upstream_connection_error"],
["something else", "agent_error"],
[undefined, "agent_error"],
])("classifies %p as %s", (message, expected) => {
expect(classifyAgentError(message)).toBe(expected);
});

let repo: TestRepo;
let server: TestableAgentServer;
let mswServer: SetupServerApi;
Expand Down Expand Up @@ -514,5 +550,93 @@ describe("Question relay", () => {
prompt: [{ type: "text", text: "original task description" }],
});
});

it("does not replay a transient upstream termination before any session activity", async () => {
vi.spyOn(server.posthogAPI, "getTask").mockResolvedValue({
id: "test-task-id",
title: "t",
description: "original task description",
} as unknown as Task);
vi.spyOn(server.posthogAPI, "getTaskRun").mockResolvedValue({
id: "test-run-id",
task: "test-task-id",
state: {},
} as unknown as TaskRun);

const promptSpy = vi
.fn()
.mockRejectedValueOnce(createTransientPromptError());
const updateTaskRunSpy = vi
.spyOn(server.posthogAPI, "updateTaskRun")
.mockResolvedValue({} as TaskRun);
server.session = {
payload: TEST_PAYLOAD,
acpSessionId: "acp-session",
clientConnection: { prompt: promptSpy },
logWriter: {
flushAll: vi.fn().mockResolvedValue(undefined),
getFullAgentResponse: vi.fn().mockReturnValue(null),
resetTurnMessages: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
isRegistered: vi.fn().mockReturnValue(true),
},
};

await server.sendInitialTaskMessage(TEST_PAYLOAD);

expect(promptSpy).toHaveBeenCalledTimes(1);
expect(updateTaskRunSpy).toHaveBeenCalledWith(
"test-task-id",
"test-run-id",
{
status: "failed",
error_message: "Upstream LLM stream terminated",
},
);
});

it("surfaces upstream connection errors with the connection-specific message", async () => {
vi.spyOn(server.posthogAPI, "getTask").mockResolvedValue({
id: "test-task-id",
title: "t",
description: "original task description",
} as unknown as Task);
vi.spyOn(server.posthogAPI, "getTaskRun").mockResolvedValue({
id: "test-run-id",
task: "test-task-id",
state: {},
} as unknown as TaskRun);

const promptSpy = vi.fn().mockImplementationOnce(async () => {
throw createTransientConnectionError();
});
const updateTaskRunSpy = vi
.spyOn(server.posthogAPI, "updateTaskRun")
.mockResolvedValue({} as TaskRun);
server.session = {
payload: TEST_PAYLOAD,
acpSessionId: "acp-session",
clientConnection: { prompt: promptSpy },
logWriter: {
flushAll: vi.fn().mockResolvedValue(undefined),
getFullAgentResponse: vi.fn().mockReturnValue(null),
resetTurnMessages: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
isRegistered: vi.fn().mockReturnValue(true),
},
};

await server.sendInitialTaskMessage(TEST_PAYLOAD);

expect(promptSpy).toHaveBeenCalledTimes(1);
expect(updateTaskRunSpy).toHaveBeenCalledWith(
"test-task-id",
"test-run-id",
{
status: "failed",
error_message: "Upstream LLM connection error",
},
);
});
Comment thread
tatoalo marked this conversation as resolved.
});
});
Loading