diff --git a/tests/reliability_tests.rs b/tests/reliability_tests.rs index 84559d3..622bf75 100644 --- a/tests/reliability_tests.rs +++ b/tests/reliability_tests.rs @@ -14,11 +14,15 @@ mod common; async fn external_duplicate_workitems_dedup() { let (store, _td) = common::create_sqlite_store_disk().await; - let orch = |ctx: OrchestrationContext, _input: String| async move { - let v = ctx.schedule_wait("Evt").await; - Ok(v) + let make_orch = || { + |ctx: OrchestrationContext, _input: String| async move { + let v = ctx.schedule_wait("Evt").await; + Ok(v) + } }; - let orchestration_registry = OrchestrationRegistry::builder().register("WaitEvt", orch).build(); + let orchestration_registry = OrchestrationRegistry::builder() + .register("WaitEvt", make_orch()) + .build(); let activity_registry = ActivityRegistry::builder().build(); let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await; let client = duroxide::Client::new(store.clone()); @@ -27,14 +31,26 @@ async fn external_duplicate_workitems_dedup() { client.start_orchestration(inst, "WaitEvt", "").await.unwrap(); assert!(common::wait_for_subscription(store.clone(), inst, "Evt", 2_000).await); - // enqueue duplicate externals + // Force the same-scoop case: both duplicate external arrivals are visible + // before the dispatcher fetches either of them. + rt.shutdown(None).await; + let wi = WorkItem::ExternalRaised { instance: inst.to_string(), name: "Evt".to_string(), data: "ok".to_string(), }; - let _ = store.enqueue_for_orchestrator(wi.clone(), None).await; - let _ = store.enqueue_for_orchestrator(wi.clone(), None).await; + store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap(); + store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap(); + + let rt = runtime::Runtime::start_with_store( + store.clone(), + ActivityRegistry::builder().build(), + OrchestrationRegistry::builder() + .register("WaitEvt", make_orch()) + .build(), + ) + .await; // wait for completion let ok = common::wait_for_history( @@ -49,9 +65,8 @@ async fn external_duplicate_workitems_dedup() { .await; assert!(ok, "timeout waiting for completion"); - // Both external events are materialized in history (unconditional materialization). - // Only the first is delivered to the subscription (causal check); the second - // has no pending subscription slot and is a no-op at replay time. + // Same scoop: both external arrivals are materialized in history. Replay + // delivers one to the pending wait and causally ignores the other. let hist = store.read(inst).await.unwrap_or_default(); let external_events: Vec<&Event> = hist .iter() @@ -229,19 +244,22 @@ async fn activity_duplicate_completion_workitems_dedup() { } // merged file: imports above already declared; avoid reimporting -// Simulate crash windows by interleaving dequeue and persistence. -// We approximate by injecting duplicates around the same window; idempotence + peek-lock should ensure correctness. +// Simulate crash windows by controlling provider dequeue/ack boundaries directly. #[tokio::test] async fn crash_after_dequeue_before_append_completion() { let (store, _td) = common::create_sqlite_store_disk().await; - let orch = |ctx: OrchestrationContext, _input: String| async move { - // Wait for external then complete with payload - let v = ctx.schedule_wait("Evt").await; - Ok(v) + let make_orch = || { + |ctx: OrchestrationContext, _input: String| async move { + // Wait for external then complete with payload + let v = ctx.schedule_wait("Evt").await; + Ok(v) + } }; - let orchestration_registry = OrchestrationRegistry::builder().register("WaitEvt", orch).build(); + let orchestration_registry = OrchestrationRegistry::builder() + .register("WaitEvt", make_orch()) + .build(); let activity_registry = ActivityRegistry::builder().build(); let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await; let client = duroxide::Client::new(store.clone()); @@ -251,17 +269,46 @@ async fn crash_after_dequeue_before_append_completion() { client.start_orchestration(inst, "WaitEvt", "").await.unwrap(); assert!(common::wait_for_subscription(store.clone(), inst, "Evt", 2_000).await); - // Enqueue the external work item + // Stop the runtime after the subscription is recorded so this test owns the + // crash window instead of racing the dispatcher. + rt.shutdown(None).await; + let wi = WorkItem::ExternalRaised { instance: inst.to_string(), name: "Evt".to_string(), data: "ok".to_string(), }; - let _ = store.enqueue_for_orchestrator(wi.clone(), None).await; - // Simulate crash-before-append by enqueuing duplicate before runtime gets to append - let _ = store.enqueue_for_orchestrator(wi.clone(), None).await; - // Wait for completion, ensure a single ExternalEvent recorded + // First copy is dequeued and locked, but never acked. This simulates a + // process crash after dequeue and before the ExternalEvent is appended. + let lock_timeout = Duration::from_millis(100); + store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap(); + let (fetched, _lock_token, _attempt_count) = store + .fetch_orchestration_item(lock_timeout, Duration::ZERO, None) + .await + .unwrap() + .expect("first external event should be fetched"); + assert_eq!(fetched.messages.len(), 1); + assert!(matches!(fetched.messages[0], WorkItem::ExternalRaised { .. })); + + // After the unacked dequeue's lock expires, restart with only the recovered + // row visible. A later duplicate is enqueued after this first scoop completes + // the orchestration, which verifies different-scoop behavior after terminal. + tokio::time::sleep(lock_timeout + Duration::from_millis(50)).await; + let rt = runtime::Runtime::start_with_options( + store.clone(), + ActivityRegistry::builder().build(), + OrchestrationRegistry::builder() + .register("WaitEvt", make_orch()) + .build(), + runtime::RuntimeOptions { + orchestrator_lock_timeout: lock_timeout, + ..Default::default() + }, + ) + .await; + + // Wait for completion. let ok = common::wait_for_history( store.clone(), inst, @@ -273,16 +320,47 @@ async fn crash_after_dequeue_before_append_completion() { ) .await; assert!(ok, "timeout waiting for completion"); - // Both external events are materialized in history (unconditional materialization). - // Only the first is delivered (causal check); the duplicate is a no-op at replay time. + + // Different scoop after terminal: a later duplicate arrival should be acked + // without appending another ExternalEvent to the completed orchestration. + store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap(); + for _ in 0..100 { + let depths = store + .as_management_capability() + .unwrap() + .get_queue_depths() + .await + .unwrap(); + if depths.orchestrator_queue == 0 { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + assert_eq!( + store + .as_management_capability() + .unwrap() + .get_queue_depths() + .await + .unwrap() + .orchestrator_queue, + 0 + ); + + rt.shutdown(None).await; + tokio::time::sleep(lock_timeout + Duration::from_millis(50)).await; + let refetched = store + .fetch_orchestration_item(lock_timeout, Duration::ZERO, None) + .await + .unwrap(); + assert!(refetched.is_none(), "late duplicate should have been acked"); + let hist = store.read(inst).await.unwrap_or_default(); let evs: Vec<&Event> = hist .iter() .filter(|e| matches!(&e.kind, EventKind::ExternalEvent { .. })) .collect(); - assert_eq!(evs.len(), 2); - - rt.shutdown(None).await; + assert_eq!(evs.len(), 1); } #[tokio::test]