From 24b0046b360e18af44a6306ae2b8a84e77c46376 Mon Sep 17 00:00:00 2001 From: pinodeca Date: Mon, 22 Jun 2026 21:51:56 +0000 Subject: [PATCH 1/3] fix(dsl): compute cron wait at execution time using deterministic clock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit df.wait_for_schedule() previously baked the wait duration at df.start() time via Utc::now(), which meant any delay between start and execution — and critically every iteration of a recurring `@>` loop — woke at the wrong moment (the stale, reused target busy-spun with wait=0 after the first tick). Now the DSL only validates the cron expression and stores it; the next tick is computed inside the orchestration using duroxide's deterministic clock (ctx.utc_now()) plus pure cron math, so it is replay-safe and correct for both single-shot and recurring waits. A NOTE references duroxide issue #34 (absolute-deadline timer) so this can later be simplified to ctx.schedule_timer_until(next). --- src/dsl.rs | 35 +++++++------- src/explain.rs | 15 +++--- src/lib.rs | 18 +++++++ src/orchestrations/execute_function_graph.rs | 49 ++++++++++++++++---- 4 files changed, 82 insertions(+), 35 deletions(-) diff --git a/src/dsl.rs b/src/dsl.rs index d0d044f..58f7d2b 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -3,7 +3,6 @@ //! DSL functions for defining durable SQL functions -use chrono::Utc; use cron::Schedule as CronSchedule; use pgrx::datum::DatumWithOid; use pgrx::prelude::*; @@ -272,29 +271,29 @@ pub fn sleep(seconds: i64) -> String { } /// Creates a wait-for-schedule node that waits until the next cron match. -/// The wait duration is computed at DSL time (when this function is called) -/// to ensure deterministic replay in the orchestration. +/// +/// The cron expression is only *validated* here (at DSL time) so an invalid +/// expression fails fast at `df.start()`. The actual "next tick" is computed at +/// execution time inside the orchestration (see `execute_wait_schedule_node`), +/// using duroxide's deterministic clock. This is intentional: a cron schedule is +/// a function of the current wall-clock time, so it must be evaluated when the +/// node actually runs — not at `df.start()` time — otherwise any delay between +/// `df.start()` and execution (and, critically, every iteration of a recurring +/// `@>` loop) would wake at the wrong moment. Only the cron expression is stored +/// in the node config. #[pg_extern(schema = "df")] pub fn wait_for_schedule(cron_expr: &str) -> String { + // Validate eagerly so a bad expression is rejected at df.start() time. The + // "0 " prefix supplies the seconds field the `cron` crate expects; the same + // prefix is re-applied at execution time when the schedule is recomputed. let cron_with_seconds = format!("0 {cron_expr}"); - let schedule = match CronSchedule::from_str(&cron_with_seconds) { - Ok(s) => s, - Err(e) => pgrx::error!("Invalid cron expression '{}': {}", cron_expr, e), - }; - - // Compute wait duration NOW (at DSL time) for deterministic orchestration replay - let now = Utc::now(); - let next = match schedule.upcoming(Utc).next() { - Some(t) => t, - None => pgrx::error!("No upcoming schedule found for '{}'", cron_expr), - }; - - let duration_secs = (next - now).num_seconds().max(0) as u64; + if let Err(e) = CronSchedule::from_str(&cron_with_seconds) { + pgrx::error!("Invalid cron expression '{}': {}", cron_expr, e); + } - // Store pre-computed seconds, not the cron expression + // Store only the cron expression. The wait is computed at execution time. let config = serde_json::json!({ "cron_expr": cron_expr, - "wait_seconds": duration_secs }); Durofut { diff --git a/src/explain.rs b/src/explain.rs index 2ac8c3b..796c3ba 100644 --- a/src/explain.rs +++ b/src/explain.rs @@ -647,18 +647,15 @@ fn format_node_display(node: &ExplainNode) -> String { format!("SLEEP {seconds}s{name_suffix}") } "WAIT_SCHEDULE" => { - // Parse config to get cron expression and wait seconds - let (cron, secs) = node + // Parse config to get the cron expression. The next tick is computed + // at execution time (not stored), so only the cron expr is shown. + let cron = node .query .as_ref() .and_then(|q| serde_json::from_str::(q).ok()) - .map(|cfg| { - let c = cfg["cron_expr"].as_str().unwrap_or("?").to_string(); - let s = cfg["wait_seconds"].as_u64().unwrap_or(0); - (c, s) - }) - .unwrap_or_else(|| ("?".to_string(), 0)); - format!("WAIT '{cron}' ({secs}s){name_suffix}") + .and_then(|cfg| cfg["cron_expr"].as_str().map(|s| s.to_string())) + .unwrap_or_else(|| "?".to_string()); + format!("WAIT '{cron}'{name_suffix}") } "HTTP" => { // Parse config to get method and URL diff --git a/src/lib.rs b/src/lib.rs index 76b26a3..39e29b3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -906,6 +906,24 @@ mod tests { let json = crate::dsl::wait_for_schedule("*/5 * * * *"); let fut = Durofut::from_json(&json); assert_eq!(fut.node_type, "WAIT_SCHEDULE"); + + // The node stores only the cron expression; the next tick is computed at + // execution time, so there must be no pre-computed wait baked in at DSL time. + let config: serde_json::Value = + serde_json::from_str(fut.query.as_ref().expect("query must be set")).unwrap(); + assert_eq!( + config["cron_expr"].as_str(), + Some("*/5 * * * *"), + "cron_expr should be preserved" + ); + assert!( + config.get("wait_seconds").is_none(), + "config must not pre-compute wait_seconds at DSL time" + ); + assert!( + config.get("target_timestamp").is_none(), + "config must not pre-compute a target_timestamp at DSL time" + ); } #[pg_test] diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index e593610..d4286e0 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -9,8 +9,11 @@ //! - Same input must always produce the same scheduling decisions use std::collections::HashMap; +use std::str::FromStr; use std::time::Duration; +use chrono::{DateTime, Utc}; +use cron::Schedule as CronSchedule; use duroxide::OrchestrationContext; use crate::activities; @@ -509,20 +512,50 @@ async fn execute_wait_schedule_node( .as_ref() .ok_or_else(|| format!("WAIT_SCHEDULE node {node_id} has no config"))?; - // Parse pre-computed config from DSL time let config: serde_json::Value = serde_json::from_str(config_str) .map_err(|e| format!("Invalid WAIT_SCHEDULE config: {e}"))?; - let wait_seconds = config["wait_seconds"] - .as_u64() - .ok_or_else(|| "WAIT_SCHEDULE missing wait_seconds".to_string())?; - - let cron_expr = config["cron_expr"].as_str().unwrap_or("?"); + let cron_expr = config["cron_expr"] + .as_str() + .ok_or_else(|| "WAIT_SCHEDULE missing cron_expr".to_string())?; + + // A cron schedule is a function of "now", so the next tick MUST be computed + // when this node actually executes — not at df.start() time — so that any + // delay before execution, and every iteration of a recurring `@>` loop, + // targets the correct upcoming tick. + // + // `ctx.utc_now()` is duroxide's deterministic clock (the only sanctioned way + // to read wall-clock time in this deterministic file): the value is recorded + // in history and replayed verbatim. The cron math below is pure given `now`, + // so the whole computation is replay-safe. The "0 " prefix supplies the + // seconds field the `cron` crate expects (mirrors df.wait_for_schedule()). + let now: DateTime = ctx + .utc_now() + .await + .map_err(|e| format!("WAIT_SCHEDULE failed to read deterministic clock: {e}"))? + .into(); + + let cron_with_seconds = format!("0 {cron_expr}"); + let schedule = CronSchedule::from_str(&cron_with_seconds) + .map_err(|e| format!("Invalid cron expression '{cron_expr}': {e}"))?; + let next = schedule + .after(&now) + .next() + .ok_or_else(|| format!("No upcoming schedule found for '{cron_expr}'"))?; + + // Clamp to zero if the tick is already in the past by the time we get here. + // + // NOTE: once duroxide gains an absolute-deadline timer + // (https://github.com/microsoft/duroxide/issues/34), this `now`-read + + // subtraction can be replaced with `ctx.schedule_timer_until(next)`, which + // targets the absolute tick directly and drops the extra utc_now() syscall. + let wait = (next - now).to_std().unwrap_or(Duration::ZERO); ctx.trace_info(format!( - "Waiting {wait_seconds} seconds until schedule: {cron_expr}" + "Waiting {}s until next schedule tick {next} (cron: {cron_expr})", + wait.as_secs() )); - ctx.schedule_timer(Duration::from_secs(wait_seconds)).await; + ctx.schedule_timer(wait).await; Ok(r#"{"scheduled": true}"#.to_string()) } From bf2089d58d1f302a4f9d85b78ae31e5ea3ceabd1 Mon Sep 17 00:00:00 2001 From: pinodeca Date: Mon, 22 Jun 2026 22:46:46 +0000 Subject: [PATCH 2/3] docs(changelog): note wait_for_schedule replay break for in-flight instances --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a0b82a1..74a6086 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc ### Changed +- **`df.wait_for_schedule()` cron timing:** the next cron tick is now computed at execution time using duroxide's deterministic clock (`ctx.utc_now()`) inside the `execute_function_graph` orchestration, instead of being pre-computed at `df.start()` time. This makes recurring `@>` schedules and any start-to-execution delay target the correct upcoming tick (#130). + + > ⚠️ **Replay-breaking for in-flight `wait_for_schedule` instances.** This change adds a recorded `utc_now()` decision before the WAIT_SCHEDULE timer, altering the orchestration's history sequence. Any durable function that was started under a `<= 0.2.3` binary and is **mid-`wait_for_schedule`** (parked on its timer) when this `.so` is loaded will fail with a duroxide nondeterminism error on replay, because its recorded history no longer matches the new code. Drain or allow such in-flight `wait_for_schedule` instances to complete before upgrading. Instances that are not currently inside a `wait_for_schedule` node are unaffected. We accepted this break (rather than introducing orchestration versioning) given the early pre-1.0 stage of the project. + - **`df.grant_usage()` / `df.revoke_usage()`:** dropped the explicit per-function `EXECUTE` allowlist. Schema `USAGE` on `df` is the real access gate for ordinary `df.*` functions, so the helpers now grant/revoke schema `USAGE`, the table privileges, and `EXECUTE` only on the sensitive functions (`df.http`, `df.grant_usage`, `df.revoke_usage`). Function signatures are unchanged and existing privileges are unaffected (#242). ### Removed From a2bb77fffbe32de32ccfd107f50fe175be32659c Mon Sep 17 00:00:00 2001 From: pinodeca Date: Mon, 22 Jun 2026 23:01:26 +0000 Subject: [PATCH 3/3] test(e2e): add behavioral test for execution-time cron evaluation Adds 24_wait_for_schedule_exec_time.sql, which fails under the old df.start()-time cron computation and passes with the new execution-time computation. A df.sleep(30) before the wait introduces a start->execution delay; the new code recomputes the next ':00' tick at execution time (fires near the minute boundary, second ~= 0) while the old code reused a fixed offset and would fire ~30s into the minute. Asserts the fire lands before second 15 (the midpoint) to distinguish the two implementations. --- .../sql/24_wait_for_schedule_exec_time.sql | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 tests/e2e/sql/24_wait_for_schedule_exec_time.sql diff --git a/tests/e2e/sql/24_wait_for_schedule_exec_time.sql b/tests/e2e/sql/24_wait_for_schedule_exec_time.sql new file mode 100644 index 0000000..eb87fd4 --- /dev/null +++ b/tests/e2e/sql/24_wait_for_schedule_exec_time.sql @@ -0,0 +1,77 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Tests: df.wait_for_schedule() computes the next cron tick at *execution time* +-- (issue #130), not at df.start() time. +-- +-- Why this distinguishes the fix from the old behavior: +-- The cron '* * * * *' fires at second :00 of every minute. The workflow runs +-- a df.sleep(30) *before* the wait, so the wait node starts ~30s after +-- df.start(). It then records clock_timestamp() the instant the wait fires. +-- +-- NEW (execution-time): the next :00 tick is recomputed when the wait runs, +-- so it lands on the minute boundary -> second ~= 0. +-- OLD (df.start()-time): the wait was pre-computed as "seconds until the next +-- :00 from df.start()" and that fixed offset was reused +-- after the 30s sleep, firing ~30s into the minute +-- (second ~= 30) instead of on the boundary. +-- +-- Asserting the fire lands near :00 therefore PASSES on the new implementation +-- and FAILS on the old one. +-- +-- Runtime: up to ~95s (30s sleep + up to 60s until the next minute boundary). + +DROP TABLE IF EXISTS wait_sched_exec_test; +CREATE TABLE wait_sched_exec_test (id SERIAL, fired_at TIMESTAMPTZ); + +CREATE TEMP TABLE _test_state (instance_id TEXT); + +INSERT INTO _test_state SELECT df.start( + df.sleep(30) + ~> df.wait_for_schedule('* * * * *') + ~> 'INSERT INTO wait_sched_exec_test (fired_at) VALUES (clock_timestamp())', + 'test-wait-schedule-exec-time' +); + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; + sec INT; +BEGIN + SELECT instance_id INTO inst_id FROM _test_state; + RAISE NOTICE 'Testing instance: %', inst_id; + + -- 30s sleep + up to 60s cron wait + scheduling latency; give generous headroom. + SELECT df.await_instance(inst_id, 150) INTO status; + + IF status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED: status = %', status; + END IF; + + SELECT date_part('second', fired_at)::int INTO sec + FROM wait_sched_exec_test + ORDER BY id DESC + LIMIT 1; + + IF sec IS NULL THEN + RAISE EXCEPTION 'TEST FAILED: workflow did not record a fired_at row'; + END IF; + + RAISE NOTICE 'wait_for_schedule fired at second-of-minute = %', sec; + + -- New code lands near :00 (second ~= 0); the old pre-computed-at-start code + -- would fire ~30s into the minute. 15 is the midpoint between those outcomes. + IF sec >= 15 THEN + RAISE EXCEPTION + 'TEST FAILED: wait_for_schedule fired at second % of the minute, expected near the :00 ' + 'boundary. The cron wait appears to have been computed at df.start() time instead of ' + 'execution time (issue #130).', sec; + END IF; + + RAISE NOTICE 'TEST PASSED: wait_for_schedule computed next tick at execution time'; +END $$; + +DROP TABLE _test_state; +DROP TABLE wait_sched_exec_test; +SELECT 'TEST PASSED' AS result;