diff --git a/apps/api/Dockerfile b/apps/api/Dockerfile index d319fc9b4..5016c1b10 100644 --- a/apps/api/Dockerfile +++ b/apps/api/Dockerfile @@ -4,7 +4,7 @@ FROM node:${NODE_VERSION}-alpine RUN apk add --no-cache libc6-compat python3 make g++ curl ENV PNPM_HOME="/pnpm" -ENV PATH="$PNPM_HOME:$PATH" +ENV PATH="$PNPM_HOME/bin:$PATH" RUN npm install -g turbo RUN npm install -g corepack@latest diff --git a/apps/api/src/routes/argoworkflow/__tests__/workflow.test.ts b/apps/api/src/routes/argoworkflow/__tests__/workflow.test.ts new file mode 100644 index 000000000..7250fb964 --- /dev/null +++ b/apps/api/src/routes/argoworkflow/__tests__/workflow.test.ts @@ -0,0 +1,22 @@ +import { describe, expect, it } from "vitest"; +import { JobStatus } from "@ctrlplane/validators/jobs"; + +import { mapTriggerToStatus } from "../workflow.js"; + +describe("mapTriggerToStatus", () => { + it.each([ + ["Pending", JobStatus.Pending], + ["Running", JobStatus.InProgress], + ["Succeeded", JobStatus.Successful], + ["Failed", JobStatus.Failure], + ["Error", JobStatus.Failure], + ])("maps Argo phase %s to %s", (phase, expected) => { + expect(mapTriggerToStatus(phase)).toBe(expected); + }); + + it("returns null for unknown phases", () => { + expect(mapTriggerToStatus("Skipped")).toBeNull(); + expect(mapTriggerToStatus("")).toBeNull(); + expect(mapTriggerToStatus("succeeded")).toBeNull(); + }); +}); diff --git a/apps/api/src/routes/argoworkflow/index.ts b/apps/api/src/routes/argoworkflow/index.ts index d211eeec0..b49b62958 100644 --- a/apps/api/src/routes/argoworkflow/index.ts +++ b/apps/api/src/routes/argoworkflow/index.ts @@ -5,7 +5,9 @@ import { Router } from "express"; import { eq } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; +import { logger } from "@ctrlplane/logger"; +import type { ArgoWorkflowPayload } from "./workflow.js"; import { handleArgoWorkflow } from "./workflow.js"; export const createArgoWorkflowRouter = (): Router => @@ -46,7 +48,15 @@ const handleWebhookRequest = async (req: Request, res: Response) => { return; } - const payload = req.body; + const payload = req.body as ArgoWorkflowPayload; + logger.info("Argo webhook received", { + jobAgentId: id, + workflowName: payload.workflowName, + uid: payload.uid, + phase: payload.phase, + jobId: payload.jobId, + eventType: payload.eventType, + }); await handleArgoWorkflow(payload); res.status(200).send(); }; diff --git a/apps/api/src/routes/argoworkflow/workflow.ts b/apps/api/src/routes/argoworkflow/workflow.ts index 08c392055..2017bc1d5 100644 --- a/apps/api/src/routes/argoworkflow/workflow.ts +++ b/apps/api/src/routes/argoworkflow/workflow.ts @@ -1,10 +1,11 @@ -import { eq } from "@ctrlplane/db"; +import { and, eq, notInArray } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import { enqueueAllReleaseTargetsDesiredVersion } from "@ctrlplane/db/reconcilers"; import * as schema from "@ctrlplane/db/schema"; +import { logger } from "@ctrlplane/logger"; import { exitedStatus, JobStatus } from "@ctrlplane/validators/jobs"; -interface ArgoWorkflowPayload { +export interface ArgoWorkflowPayload { workflowName: string; namespace: string; uid: string; @@ -16,9 +17,14 @@ interface ArgoWorkflowPayload { eventType: string; } +// Argo Workflow phases: Pending | Running | Succeeded | Failed | Error. +// Error covers controller/infra failures (timeouts, unschedulable pods, exit +// handler crashes); ctrlplane has no separate enum value, so it folds into +// Failure alongside user-code Failed. const statusMap: Record = { Succeeded: JobStatus.Successful, Failed: JobStatus.Failure, + Error: JobStatus.Failure, Running: JobStatus.InProgress, Pending: JobStatus.Pending, }; @@ -26,21 +32,40 @@ const statusMap: Record = { export const mapTriggerToStatus = (trigger: string): JobStatus | null => statusMap[trigger] ?? null; -export const getJobId = (payload: ArgoWorkflowPayload) => - payload.jobId ?? payload.workflowName; +export const getJobId = (payload: ArgoWorkflowPayload) => payload.jobId; export const handleArgoWorkflow = async (payload: ArgoWorkflowPayload) => { - const { uid, phase, startedAt, finishedAt } = payload; + const { uid, phase, startedAt, finishedAt, workflowName } = payload; const jobId = getJobId(payload); + if (jobId == null) { + logger.warn("Argo webhook missing job-id label, ignoring", { + workflowName, + uid, + phase, + }); + return; + } - const status = statusMap[phase] ?? null; - if (status == null) return; + const status = mapTriggerToStatus(phase); + if (status == null) { + logger.warn("Argo webhook with unmapped phase, ignoring", { + workflowName, + uid, + phase, + jobId, + }); + return; + } const isCompleted = exitedStatus.includes(status); const completedAt = isCompleted && finishedAt != null ? new Date(finishedAt) : null; + // Filter on status NOT IN exitedStatus so a late-arriving non-terminal event + // (Running/Pending) cannot regress a job that already settled to a terminal + // state. The sensor fans out ~13 near-simultaneous fires per workflow; this + // makes the handler idempotent without a separate read+transaction. const [updated] = await db .update(schema.job) .set({ @@ -50,10 +75,32 @@ export const handleArgoWorkflow = async (payload: ArgoWorkflowPayload) => { completedAt, updatedAt: new Date(), }) - .where(eq(schema.job.id, jobId)) + .where( + and( + eq(schema.job.id, jobId), + notInArray(schema.job.status, exitedStatus), + ), + ) .returning(); - if (updated == null) return; + if (updated == null) { + logger.info("Argo webhook produced no update", { + workflowName, + uid, + phase, + jobId, + mappedStatus: status, + }); + return; + } + + logger.info("Argo webhook updated job", { + workflowName, + uid, + phase, + jobId, + mappedStatus: status, + }); const result = await db .select({ workspaceId: schema.deployment.workspaceId })