Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc

### Changed

- **`df.wait_for_schedule()` cron timing:** the next cron tick is now computed at execution time using duroxide's deterministic clock (`ctx.utc_now()`) inside the `execute_function_graph` orchestration, instead of being pre-computed at `df.start()` time. This makes recurring `@>` schedules and any start-to-execution delay target the correct upcoming tick (#130).

> ⚠️ **Replay-breaking for in-flight `wait_for_schedule` instances.** This change adds a recorded `utc_now()` decision before the WAIT_SCHEDULE timer, altering the orchestration's history sequence. Any durable function that was started under a `<= 0.2.3` binary and is **mid-`wait_for_schedule`** (parked on its timer) when this `.so` is loaded will fail with a duroxide nondeterminism error on replay, because its recorded history no longer matches the new code. Drain or allow such in-flight `wait_for_schedule` instances to complete before upgrading. Instances that are not currently inside a `wait_for_schedule` node are unaffected. We accepted this break (rather than introducing orchestration versioning) given the early pre-1.0 stage of the project.

- **`df.grant_usage()` / `df.revoke_usage()`:** dropped the explicit per-function `EXECUTE` allowlist. Schema `USAGE` on `df` is the real access gate for ordinary `df.*` functions, so the helpers now grant/revoke schema `USAGE`, the table privileges, and `EXECUTE` only on the sensitive functions (`df.http`, `df.grant_usage`, `df.revoke_usage`). Function signatures are unchanged and existing privileges are unaffected (#242).

### Removed
Expand Down
35 changes: 17 additions & 18 deletions src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

//! DSL functions for defining durable SQL functions

use chrono::Utc;
use cron::Schedule as CronSchedule;
use pgrx::datum::DatumWithOid;
use pgrx::prelude::*;
Expand Down Expand Up @@ -272,29 +271,29 @@ pub fn sleep(seconds: i64) -> String {
}

/// Creates a wait-for-schedule node that waits until the next cron match.
/// The wait duration is computed at DSL time (when this function is called)
/// to ensure deterministic replay in the orchestration.
///
/// The cron expression is only *validated* here (at DSL time) so an invalid
/// expression fails fast at `df.start()`. The actual "next tick" is computed at
/// execution time inside the orchestration (see `execute_wait_schedule_node`),
/// using duroxide's deterministic clock. This is intentional: a cron schedule is
/// a function of the current wall-clock time, so it must be evaluated when the
/// node actually runs — not at `df.start()` time — otherwise any delay between
/// `df.start()` and execution (and, critically, every iteration of a recurring
/// `@>` loop) would wake at the wrong moment. Only the cron expression is stored
/// in the node config.
#[pg_extern(schema = "df")]
pub fn wait_for_schedule(cron_expr: &str) -> String {
// Validate eagerly so a bad expression is rejected at df.start() time. The
// "0 " prefix supplies the seconds field the `cron` crate expects; the same
// prefix is re-applied at execution time when the schedule is recomputed.
let cron_with_seconds = format!("0 {cron_expr}");
let schedule = match CronSchedule::from_str(&cron_with_seconds) {
Ok(s) => s,
Err(e) => pgrx::error!("Invalid cron expression '{}': {}", cron_expr, e),
};

// Compute wait duration NOW (at DSL time) for deterministic orchestration replay
let now = Utc::now();
let next = match schedule.upcoming(Utc).next() {
Some(t) => t,
None => pgrx::error!("No upcoming schedule found for '{}'", cron_expr),
};

let duration_secs = (next - now).num_seconds().max(0) as u64;
if let Err(e) = CronSchedule::from_str(&cron_with_seconds) {
pgrx::error!("Invalid cron expression '{}': {}", cron_expr, e);
}

// Store pre-computed seconds, not the cron expression
// Store only the cron expression. The wait is computed at execution time.
let config = serde_json::json!({
"cron_expr": cron_expr,
"wait_seconds": duration_secs
});

Durofut {
Expand Down
15 changes: 6 additions & 9 deletions src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,18 +647,15 @@ fn format_node_display(node: &ExplainNode) -> String {
format!("SLEEP {seconds}s{name_suffix}")
}
"WAIT_SCHEDULE" => {
// Parse config to get cron expression and wait seconds
let (cron, secs) = node
// Parse config to get the cron expression. The next tick is computed
// at execution time (not stored), so only the cron expr is shown.
let cron = node
.query
.as_ref()
.and_then(|q| serde_json::from_str::<serde_json::Value>(q).ok())
.map(|cfg| {
let c = cfg["cron_expr"].as_str().unwrap_or("?").to_string();
let s = cfg["wait_seconds"].as_u64().unwrap_or(0);
(c, s)
})
.unwrap_or_else(|| ("?".to_string(), 0));
format!("WAIT '{cron}' ({secs}s){name_suffix}")
.and_then(|cfg| cfg["cron_expr"].as_str().map(|s| s.to_string()))
.unwrap_or_else(|| "?".to_string());
format!("WAIT '{cron}'{name_suffix}")
}
"HTTP" => {
// Parse config to get method and URL
Expand Down
18 changes: 18 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,24 @@ mod tests {
let json = crate::dsl::wait_for_schedule("*/5 * * * *");
let fut = Durofut::from_json(&json);
assert_eq!(fut.node_type, "WAIT_SCHEDULE");

// The node stores only the cron expression; the next tick is computed at
// execution time, so there must be no pre-computed wait baked in at DSL time.
let config: serde_json::Value =
serde_json::from_str(fut.query.as_ref().expect("query must be set")).unwrap();
assert_eq!(
config["cron_expr"].as_str(),
Some("*/5 * * * *"),
"cron_expr should be preserved"
);
assert!(
config.get("wait_seconds").is_none(),
"config must not pre-compute wait_seconds at DSL time"
);
assert!(
config.get("target_timestamp").is_none(),
"config must not pre-compute a target_timestamp at DSL time"
);
}

#[pg_test]
Expand Down
49 changes: 41 additions & 8 deletions src/orchestrations/execute_function_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
//! - Same input must always produce the same scheduling decisions

use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;

use chrono::{DateTime, Utc};
use cron::Schedule as CronSchedule;
use duroxide::OrchestrationContext;

use crate::activities;
Expand Down Expand Up @@ -509,20 +512,50 @@ async fn execute_wait_schedule_node(
.as_ref()
.ok_or_else(|| format!("WAIT_SCHEDULE node {node_id} has no config"))?;

// Parse pre-computed config from DSL time
let config: serde_json::Value = serde_json::from_str(config_str)
.map_err(|e| format!("Invalid WAIT_SCHEDULE config: {e}"))?;

let wait_seconds = config["wait_seconds"]
.as_u64()
.ok_or_else(|| "WAIT_SCHEDULE missing wait_seconds".to_string())?;

let cron_expr = config["cron_expr"].as_str().unwrap_or("?");
let cron_expr = config["cron_expr"]
.as_str()
.ok_or_else(|| "WAIT_SCHEDULE missing cron_expr".to_string())?;

// A cron schedule is a function of "now", so the next tick MUST be computed
// when this node actually executes — not at df.start() time — so that any
// delay before execution, and every iteration of a recurring `@>` loop,
// targets the correct upcoming tick.
//
// `ctx.utc_now()` is duroxide's deterministic clock (the only sanctioned way
// to read wall-clock time in this deterministic file): the value is recorded
// in history and replayed verbatim. The cron math below is pure given `now`,
// so the whole computation is replay-safe. The "0 " prefix supplies the
// seconds field the `cron` crate expects (mirrors df.wait_for_schedule()).
let now: DateTime<Utc> = ctx
.utc_now()
.await
.map_err(|e| format!("WAIT_SCHEDULE failed to read deterministic clock: {e}"))?
.into();

let cron_with_seconds = format!("0 {cron_expr}");
let schedule = CronSchedule::from_str(&cron_with_seconds)
.map_err(|e| format!("Invalid cron expression '{cron_expr}': {e}"))?;
let next = schedule
.after(&now)
.next()
.ok_or_else(|| format!("No upcoming schedule found for '{cron_expr}'"))?;

// Clamp to zero if the tick is already in the past by the time we get here.
//
// NOTE: once duroxide gains an absolute-deadline timer
// (https://github.com/microsoft/duroxide/issues/34), this `now`-read +
// subtraction can be replaced with `ctx.schedule_timer_until(next)`, which
// targets the absolute tick directly and drops the extra utc_now() syscall.
let wait = (next - now).to_std().unwrap_or(Duration::ZERO);

ctx.trace_info(format!(
"Waiting {wait_seconds} seconds until schedule: {cron_expr}"
"Waiting {}s until next schedule tick {next} (cron: {cron_expr})",
wait.as_secs()
));
ctx.schedule_timer(Duration::from_secs(wait_seconds)).await;
ctx.schedule_timer(wait).await;

Ok(r#"{"scheduled": true}"#.to_string())
}
Expand Down
77 changes: 77 additions & 0 deletions tests/e2e/sql/24_wait_for_schedule_exec_time.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
-- Copyright (c) Microsoft Corporation.
-- Licensed under the PostgreSQL License.

-- Tests: df.wait_for_schedule() computes the next cron tick at *execution time*
-- (issue #130), not at df.start() time.
--
-- Why this distinguishes the fix from the old behavior:
-- The cron '* * * * *' fires at second :00 of every minute. The workflow runs
-- a df.sleep(30) *before* the wait, so the wait node starts ~30s after
-- df.start(). It then records clock_timestamp() the instant the wait fires.
--
-- NEW (execution-time): the next :00 tick is recomputed when the wait runs,
-- so it lands on the minute boundary -> second ~= 0.
-- OLD (df.start()-time): the wait was pre-computed as "seconds until the next
-- :00 from df.start()" and that fixed offset was reused
-- after the 30s sleep, firing ~30s into the minute
-- (second ~= 30) instead of on the boundary.
--
-- Asserting the fire lands near :00 therefore PASSES on the new implementation
-- and FAILS on the old one.
--
-- Runtime: up to ~95s (30s sleep + up to 60s until the next minute boundary).

DROP TABLE IF EXISTS wait_sched_exec_test;
CREATE TABLE wait_sched_exec_test (id SERIAL, fired_at TIMESTAMPTZ);

CREATE TEMP TABLE _test_state (instance_id TEXT);

INSERT INTO _test_state SELECT df.start(
df.sleep(30)
~> df.wait_for_schedule('* * * * *')
~> 'INSERT INTO wait_sched_exec_test (fired_at) VALUES (clock_timestamp())',
'test-wait-schedule-exec-time'
);

DO $$
DECLARE
inst_id TEXT;
status TEXT;
sec INT;
BEGIN
SELECT instance_id INTO inst_id FROM _test_state;
RAISE NOTICE 'Testing instance: %', inst_id;

-- 30s sleep + up to 60s cron wait + scheduling latency; give generous headroom.
SELECT df.await_instance(inst_id, 150) INTO status;

IF status != 'completed' THEN
RAISE EXCEPTION 'TEST FAILED: status = %', status;
END IF;

SELECT date_part('second', fired_at)::int INTO sec
FROM wait_sched_exec_test
ORDER BY id DESC
LIMIT 1;

IF sec IS NULL THEN
RAISE EXCEPTION 'TEST FAILED: workflow did not record a fired_at row';
END IF;

RAISE NOTICE 'wait_for_schedule fired at second-of-minute = %', sec;

-- New code lands near :00 (second ~= 0); the old pre-computed-at-start code
-- would fire ~30s into the minute. 15 is the midpoint between those outcomes.
IF sec >= 15 THEN
RAISE EXCEPTION
'TEST FAILED: wait_for_schedule fired at second % of the minute, expected near the :00 '
'boundary. The cron wait appears to have been computed at df.start() time instead of '
'execution time (issue #130).', sec;
END IF;

RAISE NOTICE 'TEST PASSED: wait_for_schedule computed next tick at execution time';
END $$;

DROP TABLE _test_state;
DROP TABLE wait_sched_exec_test;
SELECT 'TEST PASSED' AS result;
Loading