From 5fd528455871545f73c03adb0ea582f93cc0ef5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Thu, 18 Jun 2026 17:21:50 +0200 Subject: [PATCH] =?UTF-8?q?fix:=20loop=20safety=20=E2=80=94=20max=20iterat?= =?UTF-8?q?ion=20guard=20and=20malformed=20config=20detection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit M7: Loops now track iteration count across continue_as_new generations via FunctionInput.loop_iteration. After 100,000 iterations (~27 hours at the minimum 1-second rate limit), the loop fails with a clear error directing users to df.break() or workflow restructuring. M8: If a LOOP node's condition config is unparseable JSON, the code now returns an error immediately instead of silently falling through to infinite looping. --- src/dsl.rs | 1 + src/lib.rs | 78 ++++++++++++++++++++ src/orchestrations/execute_function_graph.rs | 76 +++++++++++++------ src/types.rs | 4 + 4 files changed, 136 insertions(+), 23 deletions(-) diff --git a/src/dsl.rs b/src/dsl.rs index f314663..6e13411 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -917,6 +917,7 @@ pub fn start( instance_id: instance_id.clone(), label: label.map(|s| s.to_string()), vars, + loop_iteration: 0, }; let input_json = serde_json::to_string(&input).unwrap_or(instance_id.clone()); diff --git a/src/lib.rs b/src/lib.rs index 60f82ec..f4045c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2764,6 +2764,84 @@ mod tests { "Should produce a VALUES clause" ); } + + // --- M7: Loop iteration counter persisted across continue_as_new --- + + #[pg_test] + fn test_function_input_loop_iteration_serialization() { + use crate::types::FunctionInput; + + // Verify loop_iteration is preserved through serialization + let input = FunctionInput { + instance_id: "test123".to_string(), + label: Some("test".to_string()), + vars: std::collections::HashMap::new(), + loop_iteration: 42, + }; + let json = serde_json::to_string(&input).unwrap(); + let deserialized: FunctionInput = serde_json::from_str(&json).unwrap(); + assert_eq!( + deserialized.loop_iteration, 42, + "loop_iteration must survive serialization round-trip" + ); + } + + #[pg_test] + fn test_function_input_loop_iteration_defaults_to_zero() { + use crate::types::FunctionInput; + + // Verify backward compat: old FunctionInput JSON without loop_iteration + // deserializes with loop_iteration = 0 + let json = r#"{"instance_id":"abc12345","label":"test","vars":{}}"#; + let input: FunctionInput = serde_json::from_str(json).unwrap(); + assert_eq!( + input.loop_iteration, 0, + "Missing loop_iteration should default to 0 for backward compatibility" + ); + } + + // --- M8: Malformed loop condition config detection --- + + #[pg_test] + fn test_malformed_loop_condition_detected_at_validate() { + // A LOOP node whose condition_node is a plain string (not a Durofut object) + // should be rejected by validate_recursive because for_each_config_child + // requires condition_node to deserialize as a valid Durofut. + let node = Durofut { + node_type: "LOOP".to_string(), + left_node: Some(Box::new(Durofut { + node_type: "SQL".to_string(), + query: Some("SELECT 1".to_string()), + ..Default::default() + })), + // Malformed config: valid JSON but condition_node is a string, not a Durofut object. + query: Some(r#"{"condition_node": "nonexist"}"#.to_string()), + ..Default::default() + }; + // Validate should fail because condition_node is not a valid Durofut object + let err = node.validate_recursive().unwrap_err(); + assert!( + err.contains("condition_node"), + "Error should mention condition_node, got: {err}" + ); + + // But if the config is totally not JSON, for_each_config_child skips it + // (it's treated as a plain query string, not a config object). + let non_json_node = Durofut { + node_type: "LOOP".to_string(), + left_node: Some(Box::new(Durofut { + node_type: "SQL".to_string(), + query: Some("SELECT 1".to_string()), + ..Default::default() + })), + query: Some("this is not json at all!!!".to_string()), + ..Default::default() + }; + assert!( + non_json_node.validate_recursive().is_ok(), + "LOOP with non-JSON config passes DSL validation (caught at execution time)" + ); + } } /// Required by `cargo pgrx test` diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index fd17722..e593610 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -30,6 +30,8 @@ pub const SUBTREE_NAME: &str = "pg_durable::orchestration::execute-subtree"; struct ExecutionContext { vars: HashMap, label: Option, + /// Loop iteration counter (persisted across continue_as_new generations). + loop_iteration: u64, } /// Control-flow-aware error type returned by every node handler. @@ -181,6 +183,7 @@ pub async fn execute(ctx: OrchestrationContext, input_json: String) -> Result(config_str) { - if let Some(condition_node_id) = config["condition_node"].as_str() { - ctx.trace_info("Evaluating loop condition"); - let condition_result = Box::pin(execute_function_node_with_vars( - ctx, - graph, - condition_node_id, - results, - exec_ctx, - )) - .await?; - - // Parse condition result to check truthiness (uses evaluate_condition to extract boolean from SQL result) - let should_continue = evaluate_condition(&condition_result).unwrap_or(false); - ctx.trace_info(format!( - "Loop condition evaluated to: {condition_result} (continue={should_continue})" - )); - - if !should_continue { - ctx.trace_info("Loop condition false, exiting loop"); - store_named_result(ctx, node, &body_result, results, "LOOP"); - return Ok(body_result); + match serde_json::from_str::(config_str) { + Ok(config) => { + if let Some(condition_node_id) = config["condition_node"].as_str() { + ctx.trace_info("Evaluating loop condition"); + let condition_result = Box::pin(execute_function_node_with_vars( + ctx, + graph, + condition_node_id, + results, + exec_ctx, + )) + .await?; + + // Parse condition result to check truthiness (uses evaluate_condition to extract boolean from SQL result) + let should_continue = evaluate_condition(&condition_result).unwrap_or(false); + ctx.trace_info(format!( + "Loop condition evaluated to: {condition_result} (continue={should_continue})" + )); + + if !should_continue { + ctx.trace_info("Loop condition false, exiting loop"); + store_named_result(ctx, node, &body_result, results, "LOOP"); + return Ok(body_result); + } } } + Err(e) => { + // M8: Malformed condition config should fail the loop rather than + // silently creating an infinite loop without exit condition. + return Err(NodeError::Failure(format!( + "LOOP node {node_id}: failed to parse condition config: {e}" + ))); + } } } ctx.trace_info("Continuing as new for next loop iteration"); + // M7: Enforce maximum iteration count to prevent runaway infinite loops + let next_iteration = exec_ctx.loop_iteration + 1; + if next_iteration >= MAX_LOOP_ITERATIONS { + return Err(NodeError::Failure(format!( + "Loop exceeded maximum iteration count of {MAX_LOOP_ITERATIONS}. \ + Use df.break() to exit the loop or restructure the workflow." + ))); + } + // Enforce a minimum per-iteration wall-clock duration to prevent // busy-looping (e.g. `df.loop(df.sleep(0))`). Compute the elapsed time // from the deterministic clock; if the iteration finished faster than @@ -620,6 +649,7 @@ async fn execute_loop_node( instance_id: graph.instance_id.clone(), label: exec_ctx.label.clone(), vars: exec_ctx.vars.clone(), + loop_iteration: next_iteration, }; // duroxide 0.1.1: continue_as_new returns an awaitable future - return it directly diff --git a/src/types.rs b/src/types.rs index fb558f5..4fdab4f 100644 --- a/src/types.rs +++ b/src/types.rs @@ -895,6 +895,10 @@ pub struct FunctionInput { pub label: Option, #[serde(default)] pub vars: std::collections::HashMap, + /// Loop iteration counter, incremented on each `continue_as_new`. + /// Used to enforce a maximum iteration safeguard. + #[serde(default)] + pub loop_iteration: u64, } /// Configuration for HTTP requests