From b66720efcb84c852b145340cad191d10c1862ddc Mon Sep 17 00:00:00 2001 From: Pino de Candia Date: Mon, 18 May 2026 09:28:47 -0500 Subject: [PATCH 1/4] fix: eliminate race condition in crash_after_dequeue_before_append_completion test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test had a timing race: the runtime could dequeue and process the first ExternalRaised (completing the orchestration) before the second was enqueued. When the second arrived in a subsequent batch, the orchestration was already terminal, so the dispatcher acked it without materializing — resulting in 1 ExternalEvent instead of the expected 2. Fix: shut down the runtime after the subscription is confirmed, enqueue both duplicates while the dispatcher is stopped, then restart. This guarantees both land in the same batch, faithfully simulating the real crash window (where lock expiry makes both copies visible simultaneously). --- tests/reliability_tests.rs | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/tests/reliability_tests.rs b/tests/reliability_tests.rs index 84559d3..56821ed 100644 --- a/tests/reliability_tests.rs +++ b/tests/reliability_tests.rs @@ -236,12 +236,14 @@ async fn activity_duplicate_completion_workitems_dedup() { async fn crash_after_dequeue_before_append_completion() { let (store, _td) = common::create_sqlite_store_disk().await; - let orch = |ctx: OrchestrationContext, _input: String| async move { - // Wait for external then complete with payload - let v = ctx.schedule_wait("Evt").await; - Ok(v) + let make_orch = || { + |ctx: OrchestrationContext, _input: String| async move { + // Wait for external then complete with payload + let v = ctx.schedule_wait("Evt").await; + Ok(v) + } }; - let orchestration_registry = OrchestrationRegistry::builder().register("WaitEvt", orch).build(); + let orchestration_registry = OrchestrationRegistry::builder().register("WaitEvt", make_orch()).build(); let activity_registry = ActivityRegistry::builder().build(); let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await; let client = duroxide::Client::new(store.clone()); @@ -251,16 +253,29 @@ async fn crash_after_dequeue_before_append_completion() { client.start_orchestration(inst, "WaitEvt", "").await.unwrap(); assert!(common::wait_for_subscription(store.clone(), inst, "Evt", 2_000).await); - // Enqueue the external work item + // Stop the runtime so both duplicates land in the queue before processing. + // This guarantees the dispatcher picks them up in one batch (no race). + rt.shutdown(None).await; + + // Enqueue the external work item twice (simulates crash-before-append: + // the item was dequeued but the completion was never persisted, so after + // lock expiry the item becomes visible again alongside a new copy). let wi = WorkItem::ExternalRaised { instance: inst.to_string(), name: "Evt".to_string(), data: "ok".to_string(), }; let _ = store.enqueue_for_orchestrator(wi.clone(), None).await; - // Simulate crash-before-append by enqueuing duplicate before runtime gets to append let _ = store.enqueue_for_orchestrator(wi.clone(), None).await; + // Restart runtime — dispatcher picks up both events in one batch + let rt = runtime::Runtime::start_with_store( + store.clone(), + ActivityRegistry::builder().build(), + OrchestrationRegistry::builder().register("WaitEvt", make_orch()).build(), + ) + .await; + // Wait for completion, ensure a single ExternalEvent recorded let ok = common::wait_for_history( store.clone(), From 085078303cbd8e052a48128bad3dbede9d00b14a Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Mon, 18 May 2026 15:19:52 +0000 Subject: [PATCH 2/4] test: simulate crash-before-append via lock expiry --- tests/reliability_tests.rs | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/tests/reliability_tests.rs b/tests/reliability_tests.rs index 56821ed..d9ca962 100644 --- a/tests/reliability_tests.rs +++ b/tests/reliability_tests.rs @@ -253,22 +253,36 @@ async fn crash_after_dequeue_before_append_completion() { client.start_orchestration(inst, "WaitEvt", "").await.unwrap(); assert!(common::wait_for_subscription(store.clone(), inst, "Evt", 2_000).await); - // Stop the runtime so both duplicates land in the queue before processing. - // This guarantees the dispatcher picks them up in one batch (no race). + // Stop the runtime after the subscription is recorded so this test owns the + // crash window instead of racing the dispatcher. rt.shutdown(None).await; - // Enqueue the external work item twice (simulates crash-before-append: - // the item was dequeued but the completion was never persisted, so after - // lock expiry the item becomes visible again alongside a new copy). let wi = WorkItem::ExternalRaised { instance: inst.to_string(), name: "Evt".to_string(), data: "ok".to_string(), }; - let _ = store.enqueue_for_orchestrator(wi.clone(), None).await; - let _ = store.enqueue_for_orchestrator(wi.clone(), None).await; - // Restart runtime — dispatcher picks up both events in one batch + // First copy is dequeued and locked, but never acked. This simulates a + // process crash after dequeue and before the ExternalEvent is appended. + let lock_timeout = Duration::from_millis(100); + store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap(); + let (fetched, _lock_token, _attempt_count) = store + .fetch_orchestration_item(lock_timeout, Duration::ZERO, None) + .await + .unwrap() + .expect("first external event should be fetched"); + assert_eq!(fetched.messages.len(), 1); + assert!(matches!(fetched.messages[0], WorkItem::ExternalRaised { .. })); + + // After the unacked dequeue's lock expires, a duplicate/new copy arrives. + // Restarting the runtime should fetch both the recovered locked row and the + // fresh visible row in one batch. + tokio::time::sleep(lock_timeout + Duration::from_millis(50)).await; + store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap(); + + // Restart runtime — dispatcher picks up the expired locked row and the new + // duplicate in one batch. let rt = runtime::Runtime::start_with_store( store.clone(), ActivityRegistry::builder().build(), @@ -276,7 +290,7 @@ async fn crash_after_dequeue_before_append_completion() { ) .await; - // Wait for completion, ensure a single ExternalEvent recorded + // Wait for completion. let ok = common::wait_for_history( store.clone(), inst, From 26c6c6769640138b4172b84237b2219994beb550 Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Mon, 18 May 2026 19:25:16 +0000 Subject: [PATCH 3/4] test: clarify crash recovery scenario comments --- tests/reliability_tests.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/reliability_tests.rs b/tests/reliability_tests.rs index d9ca962..6227517 100644 --- a/tests/reliability_tests.rs +++ b/tests/reliability_tests.rs @@ -229,8 +229,9 @@ async fn activity_duplicate_completion_workitems_dedup() { } // merged file: imports above already declared; avoid reimporting -// Simulate crash windows by interleaving dequeue and persistence. -// We approximate by injecting duplicates around the same window; idempotence + peek-lock should ensure correctness. +// Simulate crash windows by controlling provider dequeue/ack boundaries directly. +// The lock-expiry case below verifies that a recovered queue row can be processed +// together with a second visible external event for the same orchestration. #[tokio::test] async fn crash_after_dequeue_before_append_completion() { @@ -275,14 +276,14 @@ async fn crash_after_dequeue_before_append_completion() { assert_eq!(fetched.messages.len(), 1); assert!(matches!(fetched.messages[0], WorkItem::ExternalRaised { .. })); - // After the unacked dequeue's lock expires, a duplicate/new copy arrives. - // Restarting the runtime should fetch both the recovered locked row and the - // fresh visible row in one batch. + // After the unacked dequeue's lock expires, a duplicate external arrival is + // enqueued. Restarting the runtime should fetch both the recovered row and + // the second visible row in one batch. tokio::time::sleep(lock_timeout + Duration::from_millis(50)).await; store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap(); - // Restart runtime — dispatcher picks up the expired locked row and the new - // duplicate in one batch. + // Restart runtime: the dispatcher picks up the expired locked row and the + // second visible row together. let rt = runtime::Runtime::start_with_store( store.clone(), ActivityRegistry::builder().build(), @@ -302,8 +303,8 @@ async fn crash_after_dequeue_before_append_completion() { ) .await; assert!(ok, "timeout waiting for completion"); - // Both external events are materialized in history (unconditional materialization). - // Only the first is delivered (causal check); the duplicate is a no-op at replay time. + // Both external events are materialized in history. Replay delivers one to + // the pending wait; the second has no matching wait and is causally ignored. let hist = store.read(inst).await.unwrap_or_default(); let evs: Vec<&Event> = hist .iter() From a554c31a7edbde1217e97c0d8ee9779c9fca66db Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Mon, 18 May 2026 22:21:26 +0000 Subject: [PATCH 4/4] test: split external event duplicate race cases --- tests/reliability_tests.rs | 102 +++++++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 27 deletions(-) diff --git a/tests/reliability_tests.rs b/tests/reliability_tests.rs index 6227517..622bf75 100644 --- a/tests/reliability_tests.rs +++ b/tests/reliability_tests.rs @@ -14,11 +14,15 @@ mod common; async fn external_duplicate_workitems_dedup() { let (store, _td) = common::create_sqlite_store_disk().await; - let orch = |ctx: OrchestrationContext, _input: String| async move { - let v = ctx.schedule_wait("Evt").await; - Ok(v) + let make_orch = || { + |ctx: OrchestrationContext, _input: String| async move { + let v = ctx.schedule_wait("Evt").await; + Ok(v) + } }; - let orchestration_registry = OrchestrationRegistry::builder().register("WaitEvt", orch).build(); + let orchestration_registry = OrchestrationRegistry::builder() + .register("WaitEvt", make_orch()) + .build(); let activity_registry = ActivityRegistry::builder().build(); let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await; let client = duroxide::Client::new(store.clone()); @@ -27,14 +31,26 @@ async fn external_duplicate_workitems_dedup() { client.start_orchestration(inst, "WaitEvt", "").await.unwrap(); assert!(common::wait_for_subscription(store.clone(), inst, "Evt", 2_000).await); - // enqueue duplicate externals + // Force the same-scoop case: both duplicate external arrivals are visible + // before the dispatcher fetches either of them. + rt.shutdown(None).await; + let wi = WorkItem::ExternalRaised { instance: inst.to_string(), name: "Evt".to_string(), data: "ok".to_string(), }; - let _ = store.enqueue_for_orchestrator(wi.clone(), None).await; - let _ = store.enqueue_for_orchestrator(wi.clone(), None).await; + store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap(); + store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap(); + + let rt = runtime::Runtime::start_with_store( + store.clone(), + ActivityRegistry::builder().build(), + OrchestrationRegistry::builder() + .register("WaitEvt", make_orch()) + .build(), + ) + .await; // wait for completion let ok = common::wait_for_history( @@ -49,9 +65,8 @@ async fn external_duplicate_workitems_dedup() { .await; assert!(ok, "timeout waiting for completion"); - // Both external events are materialized in history (unconditional materialization). - // Only the first is delivered to the subscription (causal check); the second - // has no pending subscription slot and is a no-op at replay time. + // Same scoop: both external arrivals are materialized in history. Replay + // delivers one to the pending wait and causally ignores the other. let hist = store.read(inst).await.unwrap_or_default(); let external_events: Vec<&Event> = hist .iter() @@ -230,8 +245,6 @@ async fn activity_duplicate_completion_workitems_dedup() { // merged file: imports above already declared; avoid reimporting // Simulate crash windows by controlling provider dequeue/ack boundaries directly. -// The lock-expiry case below verifies that a recovered queue row can be processed -// together with a second visible external event for the same orchestration. #[tokio::test] async fn crash_after_dequeue_before_append_completion() { @@ -244,7 +257,9 @@ async fn crash_after_dequeue_before_append_completion() { Ok(v) } }; - let orchestration_registry = OrchestrationRegistry::builder().register("WaitEvt", make_orch()).build(); + let orchestration_registry = OrchestrationRegistry::builder() + .register("WaitEvt", make_orch()) + .build(); let activity_registry = ActivityRegistry::builder().build(); let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await; let client = duroxide::Client::new(store.clone()); @@ -276,18 +291,20 @@ async fn crash_after_dequeue_before_append_completion() { assert_eq!(fetched.messages.len(), 1); assert!(matches!(fetched.messages[0], WorkItem::ExternalRaised { .. })); - // After the unacked dequeue's lock expires, a duplicate external arrival is - // enqueued. Restarting the runtime should fetch both the recovered row and - // the second visible row in one batch. + // After the unacked dequeue's lock expires, restart with only the recovered + // row visible. A later duplicate is enqueued after this first scoop completes + // the orchestration, which verifies different-scoop behavior after terminal. tokio::time::sleep(lock_timeout + Duration::from_millis(50)).await; - store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap(); - - // Restart runtime: the dispatcher picks up the expired locked row and the - // second visible row together. - let rt = runtime::Runtime::start_with_store( + let rt = runtime::Runtime::start_with_options( store.clone(), ActivityRegistry::builder().build(), - OrchestrationRegistry::builder().register("WaitEvt", make_orch()).build(), + OrchestrationRegistry::builder() + .register("WaitEvt", make_orch()) + .build(), + runtime::RuntimeOptions { + orchestrator_lock_timeout: lock_timeout, + ..Default::default() + }, ) .await; @@ -303,16 +320,47 @@ async fn crash_after_dequeue_before_append_completion() { ) .await; assert!(ok, "timeout waiting for completion"); - // Both external events are materialized in history. Replay delivers one to - // the pending wait; the second has no matching wait and is causally ignored. + + // Different scoop after terminal: a later duplicate arrival should be acked + // without appending another ExternalEvent to the completed orchestration. + store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap(); + for _ in 0..100 { + let depths = store + .as_management_capability() + .unwrap() + .get_queue_depths() + .await + .unwrap(); + if depths.orchestrator_queue == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + assert_eq!( + store + .as_management_capability() + .unwrap() + .get_queue_depths() + .await + .unwrap() + .orchestrator_queue, + 0 + ); + + rt.shutdown(None).await; + tokio::time::sleep(lock_timeout + Duration::from_millis(50)).await; + let refetched = store + .fetch_orchestration_item(lock_timeout, Duration::ZERO, None) + .await + .unwrap(); + assert!(refetched.is_none(), "late duplicate should have been acked"); + let hist = store.read(inst).await.unwrap_or_default(); let evs: Vec<&Event> = hist .iter() .filter(|e| matches!(&e.kind, EventKind::ExternalEvent { .. })) .collect(); - assert_eq!(evs.len(), 2); - - rt.shutdown(None).await; + assert_eq!(evs.len(), 1); } #[tokio::test]