Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 105 additions & 27 deletions tests/reliability_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ mod common;
async fn external_duplicate_workitems_dedup() {
let (store, _td) = common::create_sqlite_store_disk().await;

let orch = |ctx: OrchestrationContext, _input: String| async move {
let v = ctx.schedule_wait("Evt").await;
Ok(v)
let make_orch = || {
|ctx: OrchestrationContext, _input: String| async move {
let v = ctx.schedule_wait("Evt").await;
Ok(v)
}
};
let orchestration_registry = OrchestrationRegistry::builder().register("WaitEvt", orch).build();
let orchestration_registry = OrchestrationRegistry::builder()
.register("WaitEvt", make_orch())
.build();
let activity_registry = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = duroxide::Client::new(store.clone());
Expand All @@ -27,14 +31,26 @@ async fn external_duplicate_workitems_dedup() {
client.start_orchestration(inst, "WaitEvt", "").await.unwrap();
assert!(common::wait_for_subscription(store.clone(), inst, "Evt", 2_000).await);

// enqueue duplicate externals
// Force the same-scoop case: both duplicate external arrivals are visible
// before the dispatcher fetches either of them.
rt.shutdown(None).await;

let wi = WorkItem::ExternalRaised {
instance: inst.to_string(),
name: "Evt".to_string(),
data: "ok".to_string(),
};
let _ = store.enqueue_for_orchestrator(wi.clone(), None).await;
let _ = store.enqueue_for_orchestrator(wi.clone(), None).await;
store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap();
store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap();

let rt = runtime::Runtime::start_with_store(
store.clone(),
ActivityRegistry::builder().build(),
OrchestrationRegistry::builder()
.register("WaitEvt", make_orch())
.build(),
)
.await;

// wait for completion
let ok = common::wait_for_history(
Expand All @@ -49,9 +65,8 @@ async fn external_duplicate_workitems_dedup() {
.await;
assert!(ok, "timeout waiting for completion");

// Both external events are materialized in history (unconditional materialization).
// Only the first is delivered to the subscription (causal check); the second
// has no pending subscription slot and is a no-op at replay time.
// Same scoop: both external arrivals are materialized in history. Replay
// delivers one to the pending wait and causally ignores the other.
let hist = store.read(inst).await.unwrap_or_default();
let external_events: Vec<&Event> = hist
.iter()
Expand Down Expand Up @@ -229,19 +244,22 @@ async fn activity_duplicate_completion_workitems_dedup() {
}
// merged file: imports above already declared; avoid reimporting

// Simulate crash windows by interleaving dequeue and persistence.
// We approximate by injecting duplicates around the same window; idempotence + peek-lock should ensure correctness.
// Simulate crash windows by controlling provider dequeue/ack boundaries directly.

#[tokio::test]
async fn crash_after_dequeue_before_append_completion() {
let (store, _td) = common::create_sqlite_store_disk().await;

let orch = |ctx: OrchestrationContext, _input: String| async move {
// Wait for external then complete with payload
let v = ctx.schedule_wait("Evt").await;
Ok(v)
let make_orch = || {
|ctx: OrchestrationContext, _input: String| async move {
// Wait for external then complete with payload
let v = ctx.schedule_wait("Evt").await;
Ok(v)
}
};
let orchestration_registry = OrchestrationRegistry::builder().register("WaitEvt", orch).build();
let orchestration_registry = OrchestrationRegistry::builder()
.register("WaitEvt", make_orch())
.build();
let activity_registry = ActivityRegistry::builder().build();
let rt = runtime::Runtime::start_with_store(store.clone(), activity_registry, orchestration_registry).await;
let client = duroxide::Client::new(store.clone());
Expand All @@ -251,17 +269,46 @@ async fn crash_after_dequeue_before_append_completion() {
client.start_orchestration(inst, "WaitEvt", "").await.unwrap();
assert!(common::wait_for_subscription(store.clone(), inst, "Evt", 2_000).await);

// Enqueue the external work item
// Stop the runtime after the subscription is recorded so this test owns the
// crash window instead of racing the dispatcher.
rt.shutdown(None).await;

let wi = WorkItem::ExternalRaised {
instance: inst.to_string(),
name: "Evt".to_string(),
data: "ok".to_string(),
};
let _ = store.enqueue_for_orchestrator(wi.clone(), None).await;
// Simulate crash-before-append by enqueuing duplicate before runtime gets to append
let _ = store.enqueue_for_orchestrator(wi.clone(), None).await;

// Wait for completion, ensure a single ExternalEvent recorded
// First copy is dequeued and locked, but never acked. This simulates a
// process crash after dequeue and before the ExternalEvent is appended.
let lock_timeout = Duration::from_millis(100);
store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap();
let (fetched, _lock_token, _attempt_count) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.expect("first external event should be fetched");
assert_eq!(fetched.messages.len(), 1);
assert!(matches!(fetched.messages[0], WorkItem::ExternalRaised { .. }));

// After the unacked dequeue's lock expires, restart with only the recovered
// row visible. A later duplicate is enqueued after this first scoop completes
// the orchestration, which verifies different-scoop behavior after terminal.
tokio::time::sleep(lock_timeout + Duration::from_millis(50)).await;
let rt = runtime::Runtime::start_with_options(
store.clone(),
ActivityRegistry::builder().build(),
OrchestrationRegistry::builder()
.register("WaitEvt", make_orch())
.build(),
runtime::RuntimeOptions {
orchestrator_lock_timeout: lock_timeout,
..Default::default()
},
)
.await;

// Wait for completion.
let ok = common::wait_for_history(
store.clone(),
inst,
Expand All @@ -273,16 +320,47 @@ async fn crash_after_dequeue_before_append_completion() {
)
.await;
assert!(ok, "timeout waiting for completion");
// Both external events are materialized in history (unconditional materialization).
// Only the first is delivered (causal check); the duplicate is a no-op at replay time.

// Different scoop after terminal: a later duplicate arrival should be acked
// without appending another ExternalEvent to the completed orchestration.
store.enqueue_for_orchestrator(wi.clone(), None).await.unwrap();
for _ in 0..100 {
let depths = store
.as_management_capability()
.unwrap()
.get_queue_depths()
.await
.unwrap();
if depths.orchestrator_queue == 0 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert_eq!(
store
.as_management_capability()
.unwrap()
.get_queue_depths()
.await
.unwrap()
.orchestrator_queue,
0
);

rt.shutdown(None).await;
tokio::time::sleep(lock_timeout + Duration::from_millis(50)).await;
let refetched = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap();
assert!(refetched.is_none(), "late duplicate should have been acked");

let hist = store.read(inst).await.unwrap_or_default();
let evs: Vec<&Event> = hist
.iter()
.filter(|e| matches!(&e.kind, EventKind::ExternalEvent { .. }))
.collect();
assert_eq!(evs.len(), 2);

rt.shutdown(None).await;
assert_eq!(evs.len(), 1);
}

#[tokio::test]
Expand Down
Loading