Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc

## [0.2.4] - Unreleased

### Added

- **Skipped node terminal status:** downstream nodes that are not executed after an upstream node failure are now marked `skipped` instead of remaining indefinitely `pending` (#240).

### Changed

- **Failure observability:** on node-level workflow failure, pg_durable now performs a terminal reconciliation pass to mark remaining `pending` nodes as `skipped` when supported by the installed schema.

## [0.2.3] - 2026-06-17

Provider-line note: v0.2.3 stays in the `duroxide-pg` provider compatibility line started in v0.2.2, so the upgrade source is v0.2.2 (`sql/pg_durable--0.2.2--0.2.3.sql`).
Expand Down
1 change: 1 addition & 0 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,7 @@ SQL |=> 'a': SELECT 1
| `✗ Failed` | Node encountered an error |
| `⏳ Running` | Node currently executing |
| `○ Pending` | Node waiting to execute |
| `⊘ Skipped` | Node was not executed because an upstream node failed |

### Visualizing Complex Structures

Expand Down
168 changes: 168 additions & 0 deletions docs/issue-240-skipped-status-plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# Issue 240 Design and Implementation Plan

## Summary

Issue 240 requests a clear way to distinguish nodes that were never executed because the workflow already failed. Today those nodes remain in `pending`, which is ambiguous.

Proposed enhancement:
- Add a terminal node status: `skipped`.
- On workflow failure, convert remaining `pending` nodes to `skipped` when the failure came from node execution (that is, at least one node is `failed`).

This keeps the existing instance-level status model unchanged (`df.instances.status` still ends as `failed`) while making node-level outcomes explicit.

## Current Behavior (Observed)

Live repro on local pg instance:
- Step 1 SQL node: `completed`
- Step 2 SQL node (intentional error): `failed`
- Step 3 SQL node (never executed): `pending`
- Instance: `failed`

This is the ambiguity reported in issue 240.

## Goals

- Make unexecuted downstream nodes observable as `skipped` after terminal workflow failure.
- Preserve backward compatibility of the new binary against old schemas.
- Keep implementation minimal and low-risk (no new tables or public function signatures).

## Non-Goals

- No change to `df.instances.status` vocabulary.
- No new monitoring projection table in this iteration.
- No attempt to classify every failure mode as producing `skipped` (for example, pre-execution policy rejection may remain as-is).

## Proposed Design

### 1. Schema: add `skipped` to allowed node statuses

Update node status check constraints so `skipped` is valid:
- Install DDL in `src/lib.rs`:
- `nodes_status_chk`: include `skipped`.
- Upgrade DDL in next upgrade script:
- drop and recreate `nodes_status_chk` (or equivalent alteration) to include `skipped`.

`nodes_result_status_chk` can remain unchanged because `skipped` nodes should not carry `result`.

### 2. Runtime: mark pending nodes as skipped at terminal failure

Add an activity that performs one set-based update for a single instance:

- New activity (suggested): `mark_pending_nodes_skipped`.
- SQL behavior:
- `UPDATE df.nodes`
- `SET status = 'skipped', updated_at = now()`
- `WHERE instance_id = $1 AND status = 'pending'`
- guarded by `EXISTS (SELECT 1 FROM df.nodes WHERE instance_id = $1 AND status = 'failed')`

Guard rationale:
- Avoid changing semantics for failures that occur before any node execution (for example, instance-level rejection paths that currently do not mark node failures).
- Keep behavior aligned with issue wording: downstream steps skipped due to an earlier step failure.

### 3. Orchestration integration point

In `execute_function_graph` top-level failure path (when instance is being moved to `failed`):
- After node failure is recorded and before/after instance status update, schedule the new activity once for that instance.
- Make the update idempotent and best-effort (safe if retried).

Why this placement:
- Central place where terminal failure is decided.
- Avoids needing per-node graph traversal logic.
- Handles linear and composite graphs (`THEN`, `IF`, `JOIN`, `RACE`, `LOOP`) uniformly.

### 4. Optional hardening in `update_node_status`

No required behavior change, but add a small guard in plan review:
- Keep allowing transitions to `completed` / `failed` as today.
- Ensure no code path writes result for `skipped`.

## Backward Compatibility and Upgrade Strategy

### Binary backward compatibility (B1)

New `.so` may run against an older schema where `nodes_status_chk` does not include `skipped`.
If runtime writes `skipped` in that state, updates would fail.

Plan:
- Runtime schema detection for `skipped` support before attempting the bulk update.
- If unsupported, no-op and keep legacy behavior (`pending`).

Implementation options:
- Option A (preferred): activity checks `pg_constraint` definition for `nodes_status_chk` containing `skipped`.
- Option B: attempt update inside savepoint-like handling and ignore check-constraint violation.

Option A is clearer and avoids noisy errors.

### Schema upgrade (A/B2)

- Create next upgrade script `sql/pg_durable--0.2.2--0.2.3.sql` (version number illustrative; use actual next version).
- Add DDL to update `nodes_status_chk` to include `skipped`.
- Ensure fresh-install schema (from current `src/lib.rs` extension SQL) matches upgraded schema.

## Test Plan

### Unit / Rust-level

- Activity test: when instance has a failed node plus pending nodes, only pending nodes become `skipped`.
- Activity test: when no failed node exists, no rows are changed.
- Compatibility test hook: when schema does not support `skipped`, activity no-ops without error.

### E2E SQL

Add a new E2E SQL test (for example `tests/e2e/sql/49_failed_downstream_nodes_skipped.sql`):
- Build a 3-step sequence where step 2 fails.
- Wait for terminal instance status `failed`.
- Assert:
- step 1 node is `completed`
- step 2 node is `failed`
- step 3 node is `skipped` (not `pending`)
- Include clear failure messages.

Also verify an instance-level failure path with no node failure (if represented in existing tests) does not force all nodes to `skipped`.

### Upgrade tests

Run:
- `./scripts/test-upgrade.sh`

Focus expectations:
- Scenario A: fresh install vs upgraded schema parity for `nodes_status_chk`.
- Scenario B1: new `.so` still works against old schema; `skipped` behavior degrades safely to legacy (`pending`) until upgrade.
- Scenario B2: data remains accessible post-upgrade.

## Docs Plan

Update user-facing status vocabulary references:
- `USER_GUIDE.md` (node status semantics)
- `docs/api-reference.md` if status values are documented there
- Optional release note entry in `CHANGELOG.md`

## Rollout and Risk

Risks:
- Writing `skipped` against non-upgraded schema (mitigated by runtime check).
- Unexpected interactions with in-flight parallel constructs (mitigated by set-based terminal update and idempotence).

Rollout:
1. Land schema + runtime + tests in one PR.
2. Validate full local matrix: fmt, clippy, unit, e2e, upgrade.
3. Document behavior change as node-level observability enhancement.

## Acceptance Criteria

- Failed workflow with downstream unexecuted nodes shows `skipped` for those nodes (post-upgrade schema).
- No regression to existing instance terminal statuses.
- New binary remains functional against pre-upgrade schema.
- E2E and upgrade tests pass.

## Implementation Checklist

- [ ] Add `skipped` to node status check constraint in install DDL (`src/lib.rs`).
- [ ] Add upgrade script change for `nodes_status_chk`.
- [ ] Add new activity to mark pending nodes as skipped for failed instances.
- [ ] Register activity in `src/registry.rs`.
- [ ] Call activity from orchestration failure path.
- [ ] Add schema-compatibility guard for pre-upgrade schemas.
- [ ] Add E2E SQL coverage.
- [ ] Update docs and changelog notes.
- [ ] Run fmt, clippy, unit, e2e, upgrade tests.
8 changes: 8 additions & 0 deletions docs/upgrade-testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ gate, so they never need to be added to the exclude list.
Each schema-changing PR should add a section here documenting what changed,
what the upgrade script handles, and any backward compatibility considerations.

### v0.2.3 → v0.2.4

#### #240 node-level `skipped` status for downstream unexecuted steps
- **DDL change:** `df.nodes.status` constraint (`nodes_status_chk`) now allows `skipped` in addition to `pending`, `running`, `completed`, and `failed`. Upgrade script: `sql/pg_durable--0.2.3--0.2.4.sql`.
- **Scenario A considerations:** fresh-install and upgraded schemas must agree on the `nodes_status_chk` status vocabulary including `skipped`.
- **Scenario B1 considerations:** the new `.so` must still run against pre-0.2.4 schemas where `nodes_status_chk` does not allow `skipped`. Runtime logic therefore detects schema support first and no-ops (retains legacy `pending` behavior) when unsupported.
- **Scenario B2 considerations:** no data migration required. Existing rows are preserved; only new terminal reconciliation on failed runs can mark unexecuted nodes as `skipped` post-upgrade.

### v0.2.2 → v0.2.3

#### Rename duroxide provider schema to `_duroxide` for fresh installs
Expand Down
77 changes: 77 additions & 0 deletions src/activities/mark_pending_nodes_skipped.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the PostgreSQL License.

//! MarkPendingNodesSkipped activity - marks unexecuted nodes as skipped
//! after a node-level failure.

use duroxide::ActivityContext;
use sqlx::PgPool;
use std::sync::Arc;

/// Activity name for registration and scheduling
pub const NAME: &str = "pg_durable::activity::mark-pending-nodes-skipped";

/// Mark pending nodes as skipped for a failed instance.
///
/// Behavior:
/// - No-op on schemas that do not support 'skipped' in nodes_status_chk.
/// - No-op unless the instance has at least one failed node.
/// - Updates only nodes still in 'pending'.
pub async fn execute(
ctx: ActivityContext,
pool: Arc<PgPool>,
input_json: String,
) -> Result<String, String> {
let input: serde_json::Value = serde_json::from_str(&input_json)
.map_err(|e| format!("Failed to parse skipped-status input: {e}"))?;

let instance_id = input["instance_id"].as_str().ok_or("Missing instance_id")?;

let skipped_supported: bool = sqlx::query_scalar(
"SELECT COALESCE(
(
SELECT pg_catalog.pg_get_constraintdef(c.oid)
FROM pg_catalog.pg_constraint c
JOIN pg_catalog.pg_class t ON t.oid = c.conrelid
JOIN pg_catalog.pg_namespace n ON n.oid = t.relnamespace
WHERE n.nspname = 'df'
AND t.relname = 'nodes'
AND c.conname = 'nodes_status_chk'
LIMIT 1
) LIKE '%''skipped''%',
false
)",
)
.fetch_one(pool.as_ref())
.await
.map_err(|e| format!("Failed to detect skipped status support: {e}"))?;

if !skipped_supported {
ctx.trace_info(format!(
"Schema does not support node status 'skipped'; leaving pending nodes unchanged for instance {instance_id}"
));
return Ok("Skipped status unsupported on schema; no-op".to_string());
}

let rows_affected = sqlx::query(
"UPDATE df.nodes n
SET status = 'skipped', updated_at = now()
WHERE n.instance_id = $1
AND n.status = 'pending'
AND EXISTS (
SELECT 1
FROM df.nodes f
WHERE f.instance_id = $1
AND f.status = 'failed'
)",
)
.bind(instance_id)
.execute(pool.as_ref())
.await
.map_err(|e| format!("Failed to mark pending nodes as skipped: {e}"))?
.rows_affected();

let msg = format!("Marked {rows_affected} pending nodes as skipped for instance {instance_id}");
ctx.trace_info(&msg);
Ok(msg)
}
1 change: 1 addition & 0 deletions src/activities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
pub mod execute_http;
pub mod execute_sql;
pub mod load_function_graph;
pub mod mark_pending_nodes_skipped;
pub mod update_instance_status;
pub mod update_node_status;
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ ALTER TABLE df.nodes
ADD CONSTRAINT nodes_result_name_chk
CHECK (result_name IS NULL OR result_name OPERATOR(pg_catalog.~) '^[A-Za-z_][A-Za-z0-9_]*$') NOT VALID,
ADD CONSTRAINT nodes_status_chk
CHECK (status OPERATOR(pg_catalog.=) ANY (ARRAY['pending', 'running', 'completed', 'failed'])) NOT VALID,
CHECK (status OPERATOR(pg_catalog.=) ANY (ARRAY['pending', 'running', 'completed', 'failed', 'skipped'])) NOT VALID,
ADD CONSTRAINT nodes_result_status_chk
CHECK (result IS NULL OR status OPERATOR(pg_catalog.=) ANY (ARRAY['completed', 'failed'])) NOT VALID,
ADD CONSTRAINT nodes_structure_chk
Expand Down
22 changes: 21 additions & 1 deletion src/orchestrations/execute_function_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,28 @@ pub async fn execute(ctx: OrchestrationContext, input_json: String) -> Result<St
}
Err(err) => {
ctx.trace_info(format!("Function failed with error: {err}"));
let instance_id = input.instance_id.clone();

// If this was a node-level failure, mark any remaining pending nodes
// as skipped for clearer terminal observability.
let skipped_input = serde_json::json!({
"instance_id": instance_id,
});
if let Err(e) = ctx
.schedule_activity(
activities::mark_pending_nodes_skipped::NAME,
skipped_input.to_string(),
)
.await
{
ctx.trace_info(format!(
"Failed to mark pending nodes as skipped for instance {}: {}",
input.instance_id, e
));
}

let status_input = serde_json::json!({
"instance_id": input.instance_id,
"instance_id": instance_id,
"status": "failed"
});
let _ = ctx
Expand Down
5 changes: 5 additions & 0 deletions src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub fn create_activity_registry(pool: Arc<PgPool>, semaphore: Arc<Semaphore>) ->
let graph_pool = pool.clone();
let status_pool = pool.clone();
let node_status_pool = pool.clone();
let skipped_pool = pool.clone();
let http_pool = pool.clone();

ActivityRegistry::builder()
Expand All @@ -37,6 +38,10 @@ pub fn create_activity_registry(pool: Arc<PgPool>, semaphore: Arc<Semaphore>) ->
let pool = node_status_pool.clone();
async move { activities::update_node_status::execute(ctx, pool, input_json).await }
})
.register(activities::mark_pending_nodes_skipped::NAME, move |ctx: ActivityContext, input_json: String| {
let pool = skipped_pool.clone();
async move { activities::mark_pending_nodes_skipped::execute(ctx, pool, input_json).await }
})
.register(activities::execute_http::NAME, move |ctx: ActivityContext, config_json: String| {
let pool = http_pool.clone();
async move { activities::execute_http::execute(ctx, pool, config_json).await }
Expand Down
Loading
Loading