Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
RUN apk add --no-cache libc6-compat python3 make g++ curl

ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH"
ENV PATH="$PNPM_HOME/bin:$PATH"

RUN npm install -g turbo
RUN npm install -g corepack@latest
Expand Down Expand Up @@ -59,7 +59,7 @@

ENV NODE_ENV=production
ENV HOSTNAME=0.0.0.0
ENV AUTH_TRUST_HOST=true

Check warning on line 62 in apps/api/Dockerfile

View workflow job for this annotation

GitHub Actions / build / build

Sensitive data should not be used in the ARG or ENV commands

SecretsUsedInArgOrEnv: Do not use ARG or ENV instructions for sensitive data (ENV "AUTH_TRUST_HOST") More info: https://docs.docker.com/go/dockerfile/rule/secrets-used-in-arg-or-env/
ENV PORT=8081

WORKDIR /app/apps/api/dist/src/
Expand Down
22 changes: 22 additions & 0 deletions apps/api/src/routes/argoworkflow/__tests__/workflow.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { describe, expect, it } from "vitest";
import { JobStatus } from "@ctrlplane/validators/jobs";

import { mapTriggerToStatus } from "../workflow.js";

describe("mapTriggerToStatus", () => {
it.each([
["Pending", JobStatus.Pending],
["Running", JobStatus.InProgress],
["Succeeded", JobStatus.Successful],
["Failed", JobStatus.Failure],
["Error", JobStatus.Failure],
])("maps Argo phase %s to %s", (phase, expected) => {
expect(mapTriggerToStatus(phase)).toBe(expected);
});

it("returns null for unknown phases", () => {
expect(mapTriggerToStatus("Skipped")).toBeNull();
expect(mapTriggerToStatus("")).toBeNull();
expect(mapTriggerToStatus("succeeded")).toBeNull();
});
});
12 changes: 11 additions & 1 deletion apps/api/src/routes/argoworkflow/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import { Router } from "express";
import { eq } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { logger } from "@ctrlplane/logger";

import type { ArgoWorkflowPayload } from "./workflow.js";
import { handleArgoWorkflow } from "./workflow.js";

export const createArgoWorkflowRouter = (): Router =>
Expand Down Expand Up @@ -46,7 +48,15 @@ const handleWebhookRequest = async (req: Request, res: Response) => {
return;
}

const payload = req.body;
const payload = req.body as ArgoWorkflowPayload;
logger.info("Argo webhook received", {
jobAgentId: id,
workflowName: payload.workflowName,
uid: payload.uid,
phase: payload.phase,
jobId: payload.jobId,
eventType: payload.eventType,
});
await handleArgoWorkflow(payload);
res.status(200).send();
};
65 changes: 56 additions & 9 deletions apps/api/src/routes/argoworkflow/workflow.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { eq } from "@ctrlplane/db";
import { and, eq, notInArray } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import { enqueueAllReleaseTargetsDesiredVersion } from "@ctrlplane/db/reconcilers";
import * as schema from "@ctrlplane/db/schema";
import { logger } from "@ctrlplane/logger";
import { exitedStatus, JobStatus } from "@ctrlplane/validators/jobs";

interface ArgoWorkflowPayload {
export interface ArgoWorkflowPayload {
workflowName: string;
namespace: string;
uid: string;
Expand All @@ -16,31 +17,55 @@ interface ArgoWorkflowPayload {
eventType: string;
}

// Argo Workflow phases: Pending | Running | Succeeded | Failed | Error.
// Error covers controller/infra failures (timeouts, unschedulable pods, exit
// handler crashes); ctrlplane has no separate enum value, so it folds into
// Failure alongside user-code Failed.
const statusMap: Record<string, JobStatus> = {
Succeeded: JobStatus.Successful,
Failed: JobStatus.Failure,
Error: JobStatus.Failure,
Running: JobStatus.InProgress,
Pending: JobStatus.Pending,
};

export const mapTriggerToStatus = (trigger: string): JobStatus | null =>
statusMap[trigger] ?? null;

export const getJobId = (payload: ArgoWorkflowPayload) =>
payload.jobId ?? payload.workflowName;
export const getJobId = (payload: ArgoWorkflowPayload) => payload.jobId;

export const handleArgoWorkflow = async (payload: ArgoWorkflowPayload) => {
const { uid, phase, startedAt, finishedAt } = payload;
const { uid, phase, startedAt, finishedAt, workflowName } = payload;

const jobId = getJobId(payload);
if (jobId == null) {
logger.warn("Argo webhook missing job-id label, ignoring", {
workflowName,
uid,
phase,
});
return;
}

const status = statusMap[phase] ?? null;
if (status == null) return;
const status = mapTriggerToStatus(phase);
if (status == null) {
logger.warn("Argo webhook with unmapped phase, ignoring", {
workflowName,
uid,
phase,
jobId,
});
return;
}

const isCompleted = exitedStatus.includes(status);
const completedAt =
isCompleted && finishedAt != null ? new Date(finishedAt) : null;

// Filter on status NOT IN exitedStatus so a late-arriving non-terminal event
// (Running/Pending) cannot regress a job that already settled to a terminal
// state. The sensor fans out ~13 near-simultaneous fires per workflow; this
// makes the handler idempotent without a separate read+transaction.
const [updated] = await db
.update(schema.job)
.set({
Expand All @@ -50,10 +75,32 @@ export const handleArgoWorkflow = async (payload: ArgoWorkflowPayload) => {
completedAt,
updatedAt: new Date(),
})
.where(eq(schema.job.id, jobId))
.where(
and(
eq(schema.job.id, jobId),
notInArray(schema.job.status, exitedStatus),
),
)
.returning();

if (updated == null) return;
if (updated == null) {
logger.info("Argo webhook produced no update", {
workflowName,
uid,
phase,
jobId,
mappedStatus: status,
});
return;
}

logger.info("Argo webhook updated job", {
workflowName,
uid,
phase,
jobId,
mappedStatus: status,
});

const result = await db
.select({ workspaceId: schema.deployment.workspaceId })
Expand Down
Loading