From 7377a04066300268e33496e424a1efc4ce905df5 Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Thu, 23 Apr 2026 22:12:18 +0000 Subject: [PATCH] feat(limits): implement size & shape limits (slices 1-10) Add MAX_HISTORY_BYTES (5 MiB), MAX_PAYLOAD_BYTES (3 MiB), and MAX_SMALL_VALUE_BYTES (64 KiB). Enforcement is opt-in via two new RuntimeOptions toggles (enforce_size_limits, emit_limit_exceeded_errors), both defaulting false so upgrades preserve existing behavior. - Tier-1/2/3 checks at client, orchestration turn, and worker - Always-on: history_size_bytes(), history_pressure(), metrics - 16 new scenario tests; docs updated --- Cargo.toml | 4 + TODO.md | 4 +- docs/ORCHESTRATION-GUIDE.md | 92 ++- docs/continue-as-new.md | 66 ++ docs/proposals-impl/PROGRESS-size-limits.md | 147 ++++ docs/proposals/core-improvements-roadmap.md | 11 +- docs/proposals/size-limits.md | 634 ++++++++++++++ src/client/mod.rs | 196 ++++- src/lib.rs | 69 ++ src/runtime/dispatchers/orchestration.rs | 356 +++++++- src/runtime/dispatchers/worker.rs | 117 +++ src/runtime/limits.rs | 54 ++ src/runtime/mod.rs | 97 +++ src/runtime/observability.rs | 77 ++ src/runtime/replay_engine.rs | 10 + src/runtime/state_helpers.rs | 71 ++ tests/scenarios.rs | 3 + tests/scenarios/limits.rs | 871 ++++++++++++++++++++ 18 files changed, 2845 insertions(+), 34 deletions(-) create mode 100644 docs/proposals-impl/PROGRESS-size-limits.md create mode 100644 docs/proposals/size-limits.md create mode 100644 tests/scenarios/limits.rs diff --git a/Cargo.toml b/Cargo.toml index 0ff6345..3144271 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,10 @@ tracing = { version = "0.1", features = ["std"] } tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter", "json"] } semver = "1" +# Forensic hashing of oversized values (logged size + hash, never raw bytes; +# see docs/proposals/size-limits.md § 5.1 tier-3). +blake3 = "1" + # Metrics facade (always on, zero-cost if no recorder installed) metrics = "0.24" diff --git a/TODO.md b/TODO.md index fad7d6e..2fb3cca 100644 --- a/TODO.md +++ b/TODO.md @@ -2,7 +2,7 @@ ### Active TODOs -- Size limits +- ~~Size limits~~ *(implemented 2026-05-06)* - RaiseEvent pub/sub - **Stale activity cleanup / Activity TTL** - Tagged activities that no matching worker picks up sit in the worker queue indefinitely @@ -58,7 +58,7 @@ - move stress tests to cargo standard "bench" - separate execution loops for orchestrations and activities, communicate through channels - Port samples from DurableTasks and Temporal to tests/scenarios/ -- Limits everywhere, orch/activity names, input/output event sizes, history sizes etc. +- ~~Limits everywhere, orch/activity names, input/output event sizes, history sizes etc.~~ *(implemented 2026-05-06)* - LLM-orchestration/provider - Revive batching from dispatcher-batching branch, currently the perf dropped drastically. Might've been a sqlite-only issue. - Mgmt API feedback diff --git a/docs/ORCHESTRATION-GUIDE.md b/docs/ORCHESTRATION-GUIDE.md index 33945a8..2d0ec04 100644 --- a/docs/ORCHESTRATION-GUIDE.md +++ b/docs/ORCHESTRATION-GUIDE.md @@ -2561,7 +2561,8 @@ async fn order_actor(ctx: OrchestrationContext, state_json: String) -> Result<() ### 1. Minimize History Size -Each orchestration turn appends events. Keep history manageable: +Each orchestration turn appends events. Keep history manageable with +`continue_as_new()`. Use `ctx.history_pressure()` to roll over proactively: ```rust // ❌ Bad - Creates millions of events @@ -2574,18 +2575,23 @@ async fn bad_loop(ctx: OrchestrationContext, _input: String) -> Result Result { let state: State = serde_json::from_str(&state_json)?; - + + // Proactively continue-as-new before hitting the 5 MiB history cap. + if ctx.history_pressure() >= 0.80 { + return ctx.continue_as_new(serde_json::to_string(&state)?).await; + } + // Process batch of 100 for i in 0..100 { ctx.schedule_activity("Process", (state.offset + i).to_string()) .await?; } - + state.offset += 100; - + if state.offset < state.total { return ctx.continue_as_new(serde_json::to_string(&state)?).await; // Fresh history } else { @@ -2594,6 +2600,8 @@ async fn good_loop(ctx: OrchestrationContext, state_json: String) -> Result Result<( --- +## Size Limits + +The runtime enforces hard caps on all values that pass through history or the provider. +These caps are `pub const` values in `src/runtime/limits.rs` — they are **not +configurable at runtime**. + +| Constant | Value | Applies to | +|---|---|---| +| `MAX_HISTORY_BYTES` | 5 MiB | Total serialized history per orchestration execution | +| `MAX_PAYLOAD_BYTES` | 3 MiB | Activity inputs/outputs, orchestration inputs, sub-orch inputs/results, external event payloads, continue-as-new inputs | +| `MAX_SMALL_VALUE_BYTES` | 64 KiB | Names (activity, sub-orch, orchestration), identifiers (instance ID, event name, queue name), error strings, cancel reasons | +| `MAX_CUSTOM_STATUS_BYTES` | 64 KiB | Custom status strings | +| `MAX_KV_VALUE_BYTES` | 16 KiB | Individual KV store values | +| `MAX_KV_KEYS` | 100 | KV keys per instance | +| `MAX_TAG_NAME_BYTES` | 256 | Activity tag names | + +### Enforcement is opt-in (two toggles) + +Both toggles default to `false` so upgrades are safe and no existing orchestration +breaks without warning. + +| `RuntimeOptions` field | Default | Effect when `true` | +|---|---|---| +| `enforce_size_limits` | `false` | Fails activities/turns that would exceed a cap | +| `emit_limit_exceeded_errors` | `false` | Produces `ConfigErrorKind::LimitExceeded` instead of `Application::OrchestrationFailed` | + +```rust +let runtime = Runtime::builder(provider) + .with_options(RuntimeOptions { + enforce_size_limits: true, + emit_limit_exceeded_errors: true, // optional — cleaner error shape + ..Default::default() + }) + .build(); +``` + +### Where enforcement happens + +| Tier | When | What is checked | +|---|---|---| +| **Tier 1 — Client** | `start_orchestration`, `raise_event`, `enqueue_event`, `cancel_instance` | Instance ID, orch name, event name, queue name, input/payload sizes | +| **Tier 2 — Orchestration turn** | On ack, before persisting | Activity/sub-orch name & input, orch output, sub-orch result, CAN input, error strings; aggregate history cap pre-persist | +| **Tier 3 — Activity worker** | After activity returns | Activity output size; error strings truncated (never fail) | + +When a tier-2 history-cap violation fires, the runtime **drops all side-effects** +(scheduled activities, timers, enqueued events) and writes only a terminal +`OrchestrationFailed` event, preventing the oversized data from ever reaching the +provider. + +### Anticipating the history cap from orchestration code + +Use `ctx.history_pressure()` (see [Performance Considerations](#1-minimize-history-size) +and [`docs/continue-as-new.md`](./continue-as-new.md#anticipating-the-history-limit)) +to roll over before hitting the hard cap: + +```rust +if ctx.history_pressure() >= 0.80 { + return ctx.continue_as_new(compact_state).await; +} +``` + +### Observability + +Three metrics are always emitted (regardless of enforcement): + +| Metric | Labels | Description | +|---|---|---| +| `duroxide_history_bytes` | `orchestration_name` | Serialized history size per turn (histogram) | +| `duroxide_payload_bytes` | `kind` | Per-payload-site byte histogram | +| `duroxide_limit_violations_total` | `resource` | Counter incremented on each violation | +| `duroxide_history_terminated_oversize_total` | `orchestration_name` | Counter when history cap terminates an instance | + +--- + ## Common Gotchas ### 1. Non-Deterministic Control Flow diff --git a/docs/continue-as-new.md b/docs/continue-as-new.md index bf7f289..1d1b5d9 100644 --- a/docs/continue-as-new.md +++ b/docs/continue-as-new.md @@ -88,3 +88,69 @@ client.prune_executions("my-instance", PruneOptions { - Pruning can be done while the workflow is actively running **Note on `keep_last` semantics:** Since the current execution is always protected and is always the highest execution_id, `keep_last: None`, `Some(0)`, and `Some(1)` are all equivalent—they all prune down to exactly the current execution. Use `None` for clarity when you want to prune all historical executions. + +## Anticipating the History Limit + +Every event appended to an orchestration's history consumes space. The runtime hard-caps +history at **5 MiB** (`MAX_HISTORY_BYTES`). When an orchestration's persisted history +reaches that cap its next turn is terminated with an error — so it's better to roll over +before reaching the limit. + +`OrchestrationContext` exposes two accessors for this purpose: + +```rust +/// Serialized byte size of history at the start of this turn (replay-safe). +pub fn history_size_bytes(&self) -> usize + +/// history_size_bytes() / MAX_HISTORY_BYTES, clamped to 0.0..=1.0. +pub fn history_pressure(&self) -> f32 +``` + +Both values are set once per turn from the baseline history that existed when the turn +started. They are **deterministic across replay** — the replay engine sets them before +invoking the orchestration function, so every replay sees the same snapshot. + +### Choosing a rollover threshold + +A common pattern is to roll over at 80 % pressure, leaving a comfortable margin for the +events that will be appended during the current turn: + +```rust +async fn eternal_monitor(ctx: OrchestrationContext, state_json: String) -> Result { + let state: State = serde_json::from_str(&state_json).unwrap_or_default(); + + // Roll over proactively before the hard cap is hit. + if ctx.history_pressure() >= 0.80 { + let trimmed = serde_json::to_string(&state.trim_for_handoff()).unwrap(); + return ctx.continue_as_new(trimmed).await; + } + + // Normal turn logic ... + let result = ctx.schedule_activity("DoWork", state_json.clone()).await?; + ctx.schedule_timer(std::time::Duration::from_secs(60)).await; + return ctx.continue_as_new(result).await; +} +``` + +### Enforcement and error shape (opt-in) + +By default the hard cap is **observed but not enforced** — the accessors are always +available but no turn is failed purely because of history size. Two `RuntimeOptions` +toggles control enforcement: + +| Toggle | Default | Effect | +|---|---|---| +| `enforce_size_limits` | `false` | When `true`, terminates turns that would exceed 5 MiB | +| `emit_limit_exceeded_errors` | `false` | When `true`, failed turns produce `ConfigErrorKind::LimitExceeded` instead of `Application::OrchestrationFailed` | + +Enable enforcement once you're confident your orchestrations roll over in time: + +```rust +let runtime = Runtime::builder(provider) + .with_options(RuntimeOptions { + enforce_size_limits: true, + emit_limit_exceeded_errors: true, + ..Default::default() + }) + .build(); +``` diff --git a/docs/proposals-impl/PROGRESS-size-limits.md b/docs/proposals-impl/PROGRESS-size-limits.md new file mode 100644 index 0000000..3becb6e --- /dev/null +++ b/docs/proposals-impl/PROGRESS-size-limits.md @@ -0,0 +1,147 @@ +# Size & Shape Limits — Implementation Progress + +**Spec:** [proposals/size-limits.md](../proposals/size-limits.md) +**Status:** In progress +**Last updated:** 2026-05-06 + +--- + +## Summary + +Implements three new hardcoded size limits (`MAX_HISTORY_BYTES = 5 MiB`, +`MAX_PAYLOAD_BYTES = 3 MiB`, `MAX_SMALL_VALUE_BYTES = 64 KiB`), a deterministic +`OrchestrationContext::history_size_bytes()` accessor, observability metrics, +and a `ConfigErrorKind::LimitExceeded` error variant. Enforcement and error +shape are gated by two `RuntimeOptions` toggles +(`enforce_size_limits`, `emit_limit_exceeded_errors`), both defaulting `false` +in the introducing release so upgrades preserve today's behavior. + +--- + +## Implementation Slices + +Each slice is a logical unit; multiple slices may land in a single PR. + +- [x] **Slice 1 — Constants + RuntimeOptions toggles** + - `MAX_HISTORY_BYTES`, `MAX_PAYLOAD_BYTES`, `MAX_SMALL_VALUE_BYTES` + in `src/runtime/limits.rs` + - `RuntimeOptions::enforce_size_limits: bool` (default `false`) + - `RuntimeOptions::emit_limit_exceeded_errors: bool` (default `false`) + - No behavior change yet; constants/fields purely additive. + +- [x] **Slice 2 — `HistoryManager::total_history_bytes` counter** + - Maintain `O(1)` running total, updated on append and during replay. + - Cache per-event serialized size on the in-memory `Event` record (or in + a parallel `Vec`) so the increment is `O(1)`. + - Always-on; foundational for slices 3 and 6. + +- [x] **Slice 3 — `OrchestrationContext` accessors** + - `history_size_bytes() -> usize` reads the value computed at this + turn boundary (excluding in-flight delta). + - `history_pressure() -> f32` returns `bytes / MAX_HISTORY_BYTES` + clamped to `0.0..=1.0`. + - Always-on; deterministic across replay. + +- [x] **Slice 4 — `ConfigErrorKind::LimitExceeded` variant** + - Add the variant to `src/lib.rs`. + - Add `display_message` arm. + - Update `fail_orchestration_for_limit()` (or equivalent) in + `src/runtime/dispatchers/orchestration.rs` to choose the error shape + based on `RuntimeOptions::emit_limit_exceeded_errors`. + - Pre-existing limit failures (`MAX_CUSTOM_STATUS_BYTES`, + `MAX_KV_VALUE_BYTES`, `MAX_KV_KEYS`, `MAX_TAG_NAME_BYTES`) continue + to emit `Application` when toggle is off (today's shape preserved). + +- [x] **Slice 5 — Tier-1 client checks** + - `Client::start_orchestration*`, `raise_event`, `enqueue_event*`, + `cancel_instance`: validate sizes and return + `Err(ClientError::Configuration(...))` when `enforce_size_limits` is on. + - Identifier checks (instance ID, orchestration name, event name, + queue name, cancel reason) use `MAX_SMALL_VALUE_BYTES`. + - Payload checks (start input, raise event payload, queue message) + use `MAX_PAYLOAD_BYTES` / `MAX_SMALL_VALUE_BYTES` per §4.1 mapping. + +- [x] **Slice 6 — Tier-2 `validate_limits()` extensions** + - Per-event checks for activity input, sub-orch input, sub-orch name, + activity name, session ID, continue-as-new input, orchestration + completion output, sub-orch result, orchestration error string. + - Aggregate history-cap check: pre-persist comparison of + `total_history_bytes + sum(serialized_size(delta))` against + `MAX_HISTORY_BYTES`. On violation, drop delta in memory, ack with + minimal terminal `OrchestrationFailed` event, drop side effects. + - All gated by `enforce_size_limits`. + +- [x] **Slice 7 — Tier-3 worker output check** + - Activity output size check before enqueueing `ActivityCompleted`. + - On violation, enqueue `ActivityFailed { details }` (shape per + `emit_limit_exceeded_errors`) and WARN-log size + BLAKE3 hash. + - Error string also bounded to `MAX_SMALL_VALUE_BYTES`; truncate + rather than fail. + - Add `blake3` dependency. + +- [x] **Slice 8 — Metrics + WARN logs** + - `duroxide.history.bytes` (always emitted, gauge per-instance on ack). + - `duroxide.payload.bytes { kind }` (always emitted, histogram per + payload site). + - `duroxide.limit.violations { resource }` (emitted on tier-1/2/3 fail). + - `duroxide.history.terminated_oversize { orchestration_name }` + (emitted when an instance terminates with `resource = "history"`). + - 75 % / 90 % WARN log thresholds, once per execution per threshold. + +- [x] **Slice 9 — Tests** + - `tests/scenarios/limits.rs` covering all 4 toggle combinations + and every test row in spec §8. + - Existing test updates where assertions on the `Application` shape + break under `(enforce=true, emit_le=true)` (these test the new + `Configuration::LimitExceeded` shape; old shape covered by + `existing_limit_failures_keep_application_when_emit_off`). + +- [x] **Slice 10 — Docs** + - `docs/continue-as-new.md` — new "Anticipating the history limit" + section with `history_pressure()` worked example. + - `docs/ORCHESTRATION-GUIDE.md` — refresh limits section, document + both toggles. + - `docs/proposals/core-improvements-roadmap.md` — mark §8 superseded. + - `TODO.md` — strike "Size limits" / "Limits everywhere…" lines. + +- [ ] **Slice 11 — Release housekeeping** *(at release time, not now)* + - `git mv docs/proposals/size-limits.md docs/proposals-impl/` + - Delete this PROGRESS file. + - Add `### Added` changelog entry; **no `### Breaking Changes`** + entry (defaults preserve today's behavior). + +--- + +## Implementation Notes + +### Slice 1 + +- Cargo is not installed in this development environment; build validation + was done via the language server (no errors reported in `limits.rs` or + `runtime/mod.rs`). User should run `cargo build --all-targets + --all-features` and `cargo nt` before merging. +- The proposal originally claimed there was a magic number `20` in + `prep_completions()` to be promoted to a `MAX_PENDING_EXTERNAL_EVENTS` + constant. Code inspection found no such constant — the only related cap + is `MAX_CARRY_FORWARD_EVENTS = 100`, which is already named. Spec was + corrected to drop that claim. + +### Slices 2–9 + +- Implemented and 1092/1092 tests pass (`cargo nt`, 2026-05-06). +- `blake3 = "1"` added to `Cargo.toml` for tier-3 forensic hashing. +- `HistoryManager` tracks `history_bytes` (baseline) and `delta_bytes` + (incremental) using `serialized_event_size()` (serde_json byte length). +- `OrchestrationContext` exposes `history_size_bytes()` and + `history_pressure()` as always-on, deterministic accessors. The value + is set once per turn from the working history before the orchestration + function runs, so it is stable and replay-safe within a turn. +- `ConfigErrorKind::LimitExceeded` is additive; `emit_limit_exceeded_errors` + defaults `false`, preserving the `Application` shape for all existing callers. +- Tier-2 `check_tier2_size_limits()` drops the in-memory delta before acking + a terminal failure event, which required clearing `metadata.pinned_duroxide_version` + to avoid a version-pinning invariant assertion when `OrchestrationStarted` + was no longer present in the delta. +- Metrics are always emitted regardless of `enforce_size_limits`; enforcement + only gates fail/truncate behavior. + diff --git a/docs/proposals/core-improvements-roadmap.md b/docs/proposals/core-improvements-roadmap.md index 47a5e74..47a1a98 100644 --- a/docs/proposals/core-improvements-roadmap.md +++ b/docs/proposals/core-improvements-roadmap.md @@ -466,7 +466,16 @@ CREATE TABLE registered_activities ( ## 8. Event Size Limits -### Problem +> **Status: Superseded — implemented in `docs/proposals/size-limits.md` (2026-05-06).** +> +> The final design differs from the sketch below: limits are hardcoded constants +> (not configurable per-runtime), enforcement is gated by two `RuntimeOptions` toggles +> (`enforce_size_limits`, `emit_limit_exceeded_errors`), and the error type uses +> `ConfigErrorKind::LimitExceeded` rather than a custom `SizeLimitError` enum. +> See [`docs/ORCHESTRATION-GUIDE.md#size-limits`](../ORCHESTRATION-GUIDE.md#size-limits) +> for the authoritative reference. + +### Problem (historical) Currently, there's no enforcement on the size of individual events. Large payloads in activity inputs/outputs, orchestration inputs, or external events can: - Cause memory pressure during replay (all events loaded into memory) diff --git a/docs/proposals/size-limits.md b/docs/proposals/size-limits.md new file mode 100644 index 0000000..80e7184 --- /dev/null +++ b/docs/proposals/size-limits.md @@ -0,0 +1,634 @@ +# Proposal: Size & Shape Limits (Simplified) + +**Status:** Implemented (slices 1–10 complete, 2026-05-06) +**Author:** AI-assisted +**Date:** 2026-05-05 +**Tracking:** TODO.md "Size limits" / "Limits everywhere…" +**Relates to:** `docs/proposals/core-improvements-roadmap.md` §8 *Event Size Limits* + +--- + +## 1. Problem + +Every value the user can hand to duroxide — an orchestration name, an activity +input, an external event payload, a KV value, a custom status string — ends up +persisted in history and shipped over the provider transport. Today the runtime +caps a small uneven subset (`MAX_CUSTOM_STATUS_BYTES`, `MAX_KV_VALUE_BYTES`, +`MAX_KV_KEYS`, `MAX_TAG_NAME_BYTES`) and leaves the rest unbounded. As a +result: + +- A 50 MiB activity output silently lands in history, gets re-read on every + replay, and may exceed provider row/column caps (Cosmos 2 MiB, DynamoDB 400 + KiB, SQLite/Postgres operator-imposed `CHECK` constraints). +- Replay cost grows with serialized history size; one oversized event can + push replay from sub-second to seconds. +- Provider-side rejection surfaces as an opaque infrastructure error far from + the schedule call that caused it. +- Orchestrations that grow their history without bound (long-running monitors, + unbounded retry loops) eventually fail in production with no warning that + the cliff was approaching. + +## 2. Goals & Non-Goals + +**Goals** +- Add **three** new size constants and one history-pressure accessor. That's + the entire surface change. +- Defensible numbers that fit inside SQLite, Postgres, and Cosmos provider + envelopes. +- Failures surface at the earliest deterministic point, with a clear error, + and oversized data **never reaches the provider**. +- Orchestration code can deterministically anticipate the history cap and + `continue_as_new()` before it fails. + +**Non-Goals** +- **Limit values are not configurable.** The three new caps and every existing + cap are `pub const`. Operators who need different numbers fork. Rationale: + per-cluster cap configurability introduces a per-instance pinning question + (does an instance carry the cap that was in effect when it started? what + about `continue_as_new`?), a mixed-cluster question (which cap wins?), and + a validation matrix. Forking three constants is cheaper than getting all of + that right. +- **No new constants beyond the three below.** Existing constants stay as-is + with their current values. We do not introduce per-call-site knobs. +- **No name shape validation** (control chars, reserved prefixes, whitespace + trimming). Out of scope; can be a separate proposal if a real bug demands it. +- **No claim-check / compression / chunking.** Document the pattern; runtime + does not implement it. +- **No provider trait changes.** All enforcement is runtime-side. + +Two runtime toggles **are** introduced (see §5.4) to manage rollout risk — +not to tune the cap values themselves. + +## 3. Current State (Baseline) + +Existing constants in [`src/runtime/limits.rs`](../../src/runtime/limits.rs) +remain **unchanged**: + +| Constant | Value | Scope | +|---|---|---| +| `MAX_CARRY_FORWARD_EVENTS` | 100 | unmatched persistent events across `continue_as_new` | +| `MAX_CUSTOM_STATUS_BYTES` | 256 KiB | `ctx.set_custom_status()` | +| `MAX_WORKER_TAGS` | 5 | tags per `TagFilter` | +| `MAX_TAG_NAME_BYTES` | 256 | activity tag name | +| `MAX_KV_KEYS` | 150 | user KV keys per instance | +| `MAX_KV_VALUE_BYTES` | 64 KiB | single KV value | + +## 4. The Three New Limits + +```rust +// src/runtime/limits.rs (additions) + +/// Maximum total serialized size of an execution's history, in bytes. +/// +/// Enforced before each ack: if appending the proposed `history_delta` would +/// push `total_history_bytes` past this cap, the delta is **discarded** and +/// the orchestration is failed with `Configuration::LimitExceeded { resource: +/// "history" }`. The oversized delta is never persisted. +/// +/// Orchestration code can read the running total via +/// `OrchestrationContext::history_size_bytes()` and roll over with +/// `continue_as_new()` before hitting this cap. +/// +/// 5 MiB — comfortably below every reference provider's per-row aggregate +/// limits, and large enough that typical workflows never approach it. +pub const MAX_HISTORY_BYTES: usize = 5 * 1024 * 1024; + +/// Maximum size of a "large payload" — the values that flow through +/// orchestration logic and live as a single event in history. +/// +/// Covers: activity input/output, orchestration input/output, +/// sub-orchestration input/output, `continue_as_new` carry-forward input. +/// +/// 3 MiB — over half of `MAX_HISTORY_BYTES` so a single big payload is +/// permitted, but two of them in the same execution will trip the history +/// cap and force a `continue_as_new` decision. +pub const MAX_PAYLOAD_BYTES: usize = 3 * 1024 * 1024; + +/// Maximum size for "small values" — short strings and discrete signals +/// that are not currently capped by an existing constant. +/// +/// Covers: orchestration name, activity name, sub-orchestration name, +/// event name, queue name, instance ID, session ID, external event payload, +/// queue message, error message, cancel reason. +/// +/// Does **not** apply to values already governed by an existing constant +/// (`MAX_CUSTOM_STATUS_BYTES`, `MAX_KV_VALUE_BYTES`, `MAX_TAG_NAME_BYTES`). +/// +/// 64 KiB — same order as `MAX_KV_VALUE_BYTES`, generous enough that +/// reasonable names/IDs (bytes-to-hundreds-of-bytes) are never near the cap, +/// and large enough for stack traces and moderate event payloads. +pub const MAX_SMALL_VALUE_BYTES: usize = 64 * 1024; +``` + +### 4.1 Per-call-site mapping + +| Call site | Limit | Tier | +|---|---|---| +| `Client::start_orchestration` input | `MAX_PAYLOAD_BYTES` | 1 (API) | +| `Client::start_orchestration` instance ID | `MAX_SMALL_VALUE_BYTES` | 1 | +| `Client::start_orchestration` orchestration name | `MAX_SMALL_VALUE_BYTES` | 1 | +| `Client::raise_event` payload | `MAX_SMALL_VALUE_BYTES` | 1 | +| `Client::raise_event` event name | `MAX_SMALL_VALUE_BYTES` | 1 | +| `Client::enqueue_event` message | `MAX_SMALL_VALUE_BYTES` | 1 | +| `Client::enqueue_event` queue name | `MAX_SMALL_VALUE_BYTES` | 1 | +| `Client::cancel_instance` reason | `MAX_SMALL_VALUE_BYTES` | 1 | +| `ctx.schedule_activity*` input | `MAX_PAYLOAD_BYTES` | 2 (validate_limits) | +| `ctx.schedule_activity*` activity name | `MAX_SMALL_VALUE_BYTES` | 2 | +| `ctx.schedule_activity_on_session*` session ID | `MAX_SMALL_VALUE_BYTES` | 2 | +| `ctx.schedule_sub_orchestration*` input | `MAX_PAYLOAD_BYTES` | 2 | +| `ctx.schedule_sub_orchestration*` name | `MAX_SMALL_VALUE_BYTES` | 2 | +| `ctx.continue_as_new(input)` | `MAX_PAYLOAD_BYTES` | 2 | +| Orchestration completion output | `MAX_PAYLOAD_BYTES` | 2 | +| Sub-orchestration result returned to parent | `MAX_PAYLOAD_BYTES` | 2 | +| Activity output | `MAX_PAYLOAD_BYTES` | 3 (worker) | +| Activity error string | `MAX_SMALL_VALUE_BYTES` | 3 | +| Orchestration error string | `MAX_SMALL_VALUE_BYTES` | 2 | +| **Aggregate**: total history bytes (per execution) | `MAX_HISTORY_BYTES` | 2 | + +Existing constants continue to govern: `MAX_CUSTOM_STATUS_BYTES` for +`ctx.set_custom_status()`, `MAX_KV_VALUE_BYTES` and `MAX_KV_KEYS` for KV, +`MAX_TAG_NAME_BYTES` for activity tag names, `MAX_CARRY_FORWARD_EVENTS` for +`continue_as_new`, and `MAX_WORKER_TAGS` for `TagFilter`_. +_ +## 5. Failure Mode + +**One terminal outcome. No retries. Two runtime toggles control rollout risk.** + +A new variant on the existing `ConfigErrorKind`: + +```rust +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum ConfigErrorKind { + Nondeterminism, + LimitExceeded, // NEW +} +``` + +The variant is *defined* unconditionally, but whether the runtime ever +*produces* it is gated by `RuntimeOptions::emit_limit_exceeded_errors` +(see §5.4). Whether the runtime *enforces* the new caps at all is gated +by `RuntimeOptions::enforce_size_limits`. + +The `resource: String` field on `ErrorDetails::Configuration` carries a +short stable identifier of the offending site: + +- `"history"` — total history size cap exceeded +- `"activity_input:MyActivity"`, `"activity_output:MyActivity"` +- `"orchestration_input"`, `"orchestration_output"` +- `"sub_orch_input:Child"`, `"sub_orch_output:Child"` +- `"external_event:OrderCancelled"`, `"queue_message:notifications"` +- `"name:activity"`, `"name:orchestration"`, `"name:event"`, `"name:queue"`, + `"name:session"`, `"identifier:instance_id"` +- `"error_message"`, `"cancel_reason"` + +The `message: Option` field is human-readable +(`"size 4194304 bytes exceeds limit 3145728 bytes"`), bounded to +`MAX_SMALL_VALUE_BYTES`, and not parsed by tooling. + +### 5.1 Three enforcement tiers + +All three tiers run only when `RuntimeOptions::enforce_size_limits == true`. +When the toggle is `false`, the runtime measures (counters, metrics, the +`history_size_bytes()` accessor) but never fails for a new-cap violation — +oversized values flow through to the provider as they do today. + +**Tier 1 — Client API (preferred).** `Client` methods that accept +user-controlled bytes validate before writing to the provider and return +`Err(ClientError::Configuration(...))`. Nothing reaches history. + +**Tier 2 — Orchestration turn (`validate_limits()`).** For values produced +*inside* an orchestration, the dispatcher inspects the proposed `history_delta` +**before** appending it. On any violation: + +1. The delta is **dropped in memory**. No part of it is persisted. +2. A minimal terminal delta is built: a single + `OrchestrationFailed { details }` event, where the error shape obeys + `emit_limit_exceeded_errors` (see §5.4). By construction the event is + well under any plausible cap. +3. The work item is acked with the minimal delta. Side effects from the + rejected delta (worker enqueues, sub-orch starts, external events) are + **not** enqueued — they were part of the same atomic ack that we replaced. +4. `duroxide.limit.violations { resource }` increments. + +**Tier 3 — Worker (activity output).** The activity worker checks the +serialized output size before enqueueing `ActivityCompleted`. On violation, +it enqueues `ActivityFailed { details }` instead, with `details` again +shaped per §5.4. The original output is dropped before reaching the +provider; the worker logs (WARN) `output_size_bytes` and `output_blake3_hex` +(16 hex chars) for forensic correlation. **Raw payload bytes are never +logged.** + +On the next orchestrator turn, the replay engine sees the failure and +treats it as terminal regardless of which shape was emitted: `Configuration` +errors abort the turn at the replay-engine level (existing behavior in +`replay_engine.rs`); `Application { retryable: false }` errors are +non-retryable application failures that today's `fail_orchestration_for_limit()` +path already produces. In both shapes the instance ends `Failed` and +orchestration code does **not** catch the failure via `?` on +`schedule_activity`. + +### 5.2 The history-cap check is pre-persist + +This is the central correctness point: **the runtime never persists a +history that exceeds `MAX_HISTORY_BYTES`.** + +Implementation: + +- `HistoryManager` maintains an incremental counter `total_history_bytes: + u64`, updated when events are appended or replayed (`O(1)` per event). + No full-history walks on the hot path. +- Before each ack, `validate_limits()` computes + `proposed_total = total_history_bytes + sum(serialized_size(e) for e in delta)`. +- If `proposed_total > MAX_HISTORY_BYTES`, the §5.1 tier-2 conversion + runs with `resource = "history"`. The minimal terminal delta is one + small event; appending it cannot itself exceed the cap (5 MiB minus a + few hundred bytes of headroom is a safe assumption). +- Per-event size violations (e.g., a 4 MiB activity input that fits the + per-payload cap but combined with existing history would push past 5 + MiB) are caught by the same check. + +The serialized-size measurement uses `String::len()` on the JSON-encoded +event (the same form the provider stores). This is `O(event)` once at +append time, then cached on the in-memory event record, so the running +total is always cheap to update. + +### 5.3 Behavior change vs. today (gated) + +Existing limit failures (`MAX_CUSTOM_STATUS_BYTES`, `MAX_KV_VALUE_BYTES`, +`MAX_KV_KEYS`, `MAX_TAG_NAME_BYTES`) currently emit +`Application { OrchestrationFailed, retryable: false }`. The long-term shape +is `Configuration { LimitExceeded }` for consistency with the new limits — a +hardcoded limit is a deployment fact, not an application failure, and +retrying through orchestration `?` cannot fix it. + +The migration is **gated by `emit_limit_exceeded_errors`** (§5.4): + +- **Toggle off (default initially)**: pre-existing limits keep emitting + `Application { OrchestrationFailed, retryable: false }`, exactly as today. + No mixed-cluster wire-format risk; no test breakage for assertions on the + `Application` shape. +- **Toggle on**: pre-existing limits emit `Configuration { LimitExceeded }`, + same shape as the new limits. Tests asserting `Application` for these + specific failures must be updated. Operators relying on metric labels see + these failures move from the application bucket to the configuration + bucket. + +When `enforce_size_limits` is on but `emit_limit_exceeded_errors` is off, +the new limits also emit `Application { OrchestrationFailed, retryable: +false }` rather than `Configuration { LimitExceeded }`. The wire format +remains a shape every existing runtime understands; the operator can adopt +the protective enforcement without committing to the wire-format change +until the cluster is fully on a `LimitExceeded`-aware version. + +### 5.4 Two runtime toggles + +```rust +// src/runtime/mod.rs (RuntimeOptions additions) +pub struct RuntimeOptions { + // ... existing fields ... + + /// When `true`, the runtime enforces `MAX_HISTORY_BYTES`, + /// `MAX_PAYLOAD_BYTES`, and `MAX_SMALL_VALUE_BYTES` at all three + /// tiers (Client API, orchestration turn, worker output). + /// + /// When `false` (default for the introducing release), the runtime + /// **measures** every value-bearing call site and updates metrics + /// (`duroxide.payload.bytes`, `duroxide.history.bytes`) but never + /// fails for a new-cap violation. Use the off state to observe + /// population pressure (via `duroxide.history.bytes` and + /// `OrchestrationContext::history_size_bytes()`) and refactor at-risk + /// orchestrations *before* turning enforcement on. + /// + /// Independent of `emit_limit_exceeded_errors`. + pub enforce_size_limits: bool, + + /// When `true`, every limit failure (both pre-existing and new) emits + /// `ErrorDetails::Configuration { kind: ConfigErrorKind::LimitExceeded, + /// resource, message }`. + /// + /// When `false` (default for the introducing release), every limit + /// failure emits `ErrorDetails::Application { kind: OrchestrationFailed, + /// retryable: false, message }` — the shape pre-existing limits use + /// today, recognizable to every prior duroxide version. Leave off + /// during a rolling upgrade until every node in the cluster has been + /// upgraded to a duroxide version that recognizes the `LimitExceeded` + /// variant; otherwise older nodes hit the existing + /// `FailedDeserialization` poison path on the unknown variant. + /// + /// Independent of `enforce_size_limits`. + pub emit_limit_exceeded_errors: bool, +} +``` + +The four toggle combinations all have a clear meaning: + +| `enforce` | `emit_le` | Behavior | +|---|---|---| +| `false` | `false` | Today's behavior, plus measurement metrics and `history_size_bytes()`. No failures for new caps; pre-existing limits emit `Application`. **Safe upgrade default.** | +| `true` | `false` | New caps enforced; all failures (new and pre-existing) emit `Application`. **Aggressive: protective enforcement without wire-format change.** Safe in mixed clusters. | +| `false` | `true` | New caps not enforced; pre-existing limits emit `Configuration::LimitExceeded`. Useful only for testing the new error shape; not a long-term posture. | +| `true` | `true` | Long-term target: full enforcement, consistent error shape. **Requires every cluster node to recognize the variant.** | + +The defaults flip in a future release (§11). + +A single canary node can flip either toggle to validate behavior, with +a config-change rollback path if anything goes sideways. The two-release +phased-rollout pattern is unnecessary because the wire-format risk is +gated by the toggle, not by the binary version. + +## 6. Anticipating the History Cap + +Hardcoded limits are only humane if orchestrations can see how close they +are. Three pieces: + +### 6.1 Orchestration accessor (deterministic) + +```rust +impl OrchestrationContext { + /// Bytes of serialized history for this execution at the current turn + /// boundary. Deterministic across replay (computed from already-replayed + /// events; does not include the in-flight delta of the current turn). + pub fn history_size_bytes(&self) -> usize; + + /// `history_size_bytes() / MAX_HISTORY_BYTES` as a 0.0..=1.0 ratio. + /// Convenience for "am I getting close to the cap?" checks. + pub fn history_pressure(&self) -> f32; +} +``` + +Canonical usage at a safe checkpoint: + +```rust +// Inside an orchestration loop that processes batches. +loop { + let batch = ctx.schedule_activity("FetchBatch", &cursor).await?; + if batch.is_empty() { return Ok("done".into()); } + process_batch(&ctx, &batch).await?; + cursor = next_cursor(&batch); + + // Roll over before history grows too large. + if ctx.history_pressure() > 0.75 { + return ctx.continue_as_new(cursor).await; + } +} +``` + +**Determinism.** Both methods are pure functions of replayed history at +the current turn boundary. Replays produce identical values. The current +turn's delta is excluded so the value is stable across the turn. + +### 6.2 Where the accessor can be called + +- **Inside orchestration code (`OrchestrationContext`)**: yes, always. This + is the only place it's both useful and deterministic. +- **Inside activity code (`ActivityContext`)**: not exposed. Activities + don't see history, can't `continue_as_new`, and produce exactly one + history event (their result). The check would be meaningless and + non-deterministic — activities aren't replayed, so a value read at run + time wouldn't be reproducible. +- **From orchestration code at any await point**: yes. The value reflects + history-as-of-this-turn-boundary. +- **From inside an activity scheduled by an orchestration**: no. The + orchestration must check before scheduling and pass any decision (e.g., + "this is the last batch") into the activity input. + +In short: **orchestration code only.** Activities ask their orchestration +to make the call and pass results in. + +### 6.3 Operator visibility (non-deterministic, runtime-side) + +Independent of orchestration code: + +- **Metric** `duroxide.history.bytes` (gauge / histogram, label + `{orchestration_name}`) — emitted on every ack with the post-ack + `total_history_bytes`. Gives operators a population view of how close + instances run to the cap. +- **Metric** `duroxide.history.terminated_oversize` (counter, label + `{orchestration_name}`) — increments when an instance fails with + `resource = "history"`. +- **WARN log** at 75 % and 90 % thresholds, with `instance_id`, + `orchestration_name`, `total_history_bytes`. Logged at most once per + threshold per execution to avoid spam. + +Operators who see population pressure rising act by either (i) asking the +app team to add a `continue_as_new` checkpoint, or (ii) shipping a code +change. The runtime does not auto-roll-over — only the orchestration +knows what state to carry forward. + +### 6.4 Documentation + +`docs/continue-as-new.md` gains a new section "Anticipating the history +limit" that walks through the `history_pressure()` checkpoint pattern, +shows a worked example, and links back to this proposal. + +## 7. Determinism + +Limits are baked into the runtime binary and never enter history. Replay +of an old history that was within the cap when written succeeds on a newer +binary regardless of any limit change (because the events already exist +and the per-event check only runs at append time, not on replay). Replay +of a history that contains a terminal limit-failure event (under either +emitted shape) always reaches the same terminal state. + +**The toggles are not part of replay state.** They are runtime-binary +configuration, identical in spirit to today's `MAX_CUSTOM_STATUS_BYTES` +constant. Two replicas of the same orchestration **must** be configured +identically — the same way they must run the same duroxide version. A +cluster with mixed toggle settings on the same instance pool can produce +different failure shapes for the same input, which is unsupported (and +identical to running mixed binary versions). Operators set the toggles +as part of their deployment configuration and roll them out the same way +they roll out a code change. + +`history_size_bytes()` and `history_pressure()` are deterministic because +they are computed from the same replayed history that all replicas see — +independent of the toggles. + +## 8. Testing Plan + +Unit tests in `src/runtime/limits.rs` cover the constants and the +`history_pressure()` arithmetic. + +Integration tests in `tests/scenarios/limits.rs`: + +| Test | Scenario | Expected | +|---|---|---| +| `activity_input_too_large` | Schedule activity with 4 MiB input | Tier-2 conversion: instance terminal with `Configuration::LimitExceeded { resource: "activity_input:..." }`; oversized delta never persisted | +| `activity_output_too_large` | Activity returns 4 MiB | Tier-3: `ActivityFailed { Configuration::LimitExceeded }`; instance terminal; raw output never logged; WARN contains size + BLAKE3 hash | +| `orchestration_input_too_large` | `start_orchestration` with 4 MiB input | Tier-1: `Err(ClientError::Configuration)`; nothing written to provider | +| `external_event_too_large` | `raise_event` with 100 KiB payload | Tier-1: `Err(ClientError::Configuration)` | +| `instance_id_too_long` | `start_orchestration` with 65 KiB instance ID | Tier-1: rejected | +| `orchestration_name_too_long` | Register orchestration with 65 KiB name | Rejected at registration (panic with `LimitViolation` payload) | +| `history_cap_terminates_instance` | Loop scheduling activities with 1 MiB outputs until total > 5 MiB | Instance terminal at the offending turn; oversized delta never persisted; counter `duroxide.history.terminated_oversize` increments | +| `history_size_bytes_is_deterministic` | Run orchestration to a known state, replay, assert `history_size_bytes()` returns the same value on every replay | Equal across replays | +| `history_pressure_drives_continue_as_new` | Orchestration that calls `continue_as_new` when `history_pressure() > 0.75`, runs through 10 generations | Each generation stays under cap; instance never fails for size | +| `existing_limit_failures_keep_application_when_emit_off` | Trigger `MAX_CUSTOM_STATUS_BYTES` etc. with `emit_limit_exceeded_errors = false` | Each produces `Application { OrchestrationFailed, retryable: false }` (preserves today's shape; verifies §5.3 toggle behavior) | +| `existing_limit_failures_become_configuration_when_emit_on` | Trigger same limits with `emit_limit_exceeded_errors = true` | Each produces `Configuration::LimitExceeded` | +| `enforce_off_measures_but_does_not_fail` | Schedule activity with 4 MiB input, `enforce_size_limits = false` | Orchestration succeeds; `duroxide.payload.bytes` records the size; no failure event | +| `enforce_on_emit_off_uses_application_shape` | Trigger any new-cap violation with `enforce_size_limits = true`, `emit_limit_exceeded_errors = false` | Failure event uses `Application { OrchestrationFailed, retryable: false }` shape (mixed-cluster-safe) | +| `tier2_failure_drops_side_effects` | Orchestration that schedules an activity *and* sets oversized custom status in the same turn (enforcement on) | Activity is **not** enqueued (rejected delta drops side effects); instance terminal | +| `replay_of_pre_limit_history_succeeds` | Hand-craft a history that contains a 4 MiB activity output (allowed when written), replay it under the new code | Replay succeeds; `history_size_bytes()` reports the (large) value; no spurious limit failure | +| `activity_output_log_no_payload_prefix` | Activity returns oversized output | WARN log contains `output_size_bytes` and `output_blake3_hex`; does **not** contain raw payload bytes | +| `multibyte_utf8_name_length` | Schedule activity with a 64-emoji name (256 UTF-8 bytes) and a 16385-emoji name (~64 KiB + 4 bytes) | First accepted, second rejected — confirms byte-length not char-length | + +## 9. Files Touched + +| File | Change | +|---|---| +| `src/runtime/limits.rs` | Add `MAX_HISTORY_BYTES`, `MAX_PAYLOAD_BYTES`, `MAX_SMALL_VALUE_BYTES`; no other changes to existing constants | +| `src/lib.rs` | Add `ConfigErrorKind::LimitExceeded` variant | +| `src/runtime/mod.rs` (`RuntimeOptions`) | Add `enforce_size_limits: bool` and `emit_limit_exceeded_errors: bool` fields (both default `false` in the introducing release; defaults flip in a later release per §11) | +| `src/runtime/dispatchers/orchestration.rs` | Extend `validate_limits()` with the new checks (gated by `enforce_size_limits`); implement the pre-persist history-cap check (drop oversized delta, ack with minimal terminal event); make `fail_orchestration_for_limit()` choose error shape per `emit_limit_exceeded_errors` | +| `src/runtime/dispatchers/worker.rs` | Activity output size check (Tier 3, gated by `enforce_size_limits`); error shape per `emit_limit_exceeded_errors`; BLAKE3-hash + size WARN log on oversized output; no raw bytes | +| `src/runtime/history_manager.rs` | Maintain `total_history_bytes: u64` incremental counter (`O(1)` per event, **always** — independent of toggles, since `history_size_bytes()` reads it); cache per-event serialized size on the in-memory record | +| `src/runtime/registry.rs` | Reject orchestration/activity names exceeding `MAX_SMALL_VALUE_BYTES` at register-time when `enforce_size_limits` is on (panic, matches existing duplicate-name behavior) | +| `src/client/mod.rs` | Tier-1 checks on every input-accepting client method (gated by `enforce_size_limits`); error shape per `emit_limit_exceeded_errors` | +| `src/lib.rs` (or `src/futures.rs`) | New `OrchestrationContext::history_size_bytes()` and `history_pressure()`; surfaced from the existing per-turn history snapshot. **Always available** (independent of toggles) | +| `docs/continue-as-new.md` | New "Anticipating the history limit" section with the `history_pressure()` pattern | +| `docs/ORCHESTRATION-GUIDE.md` | Refresh limits section; document both toggles | +| `docs/proposals/core-improvements-roadmap.md` | Mark §8 superseded by this doc | +| `TODO.md` | Strike "Size limits" / "Limits everywhere…" lines | +| `tests/scenarios/limits.rs` | New scenario file (see §8) — must cover all four toggle combinations | + +No provider trait changes. The only event-schema addition is the +`ConfigErrorKind::LimitExceeded` variant, and it is *defined* in the +introducing release but only *produced* when `emit_limit_exceeded_errors` +is on — an operator-controlled gate, not a binary-version gate. + +## 10. Rolling Upgrade + +The two toggles in §5.4 manage the two distinct rolling-upgrade risks: + +- **Wire-format risk** — old runtimes can't deserialize + `Configuration::LimitExceeded` and would hit the existing + `FailedDeserialization` poison path on the unknown variant. Gated by + `emit_limit_exceeded_errors`. Leave off during the upgrade window; + flip on once every node recognizes the variant. +- **In-flight-orchestration risk** — orchestrations that have been + quietly accumulating large history or large payloads under the + pre-upgrade runtime will start failing the moment caps are enforced. + Gated by `enforce_size_limits`. Leave off after upgrade, observe + population pressure via `duroxide.history.bytes` and the + `history_size_bytes()` accessor, refactor at-risk orchestrations, + *then* flip on. + +Other rules: + +- **A history written under an older runtime is always replayable** by a + newer runtime, regardless of cap changes or toggle settings. Per-event + size checks run only at append time; replay does not re-validate. +- **Toggle settings must be uniform within an instance pool.** Mixed + toggle settings on nodes processing the same instance produce different + failure shapes for the same input — same caveat as running mixed + binary versions. Treat the toggles as part of deployment configuration. +- **Tightening a cap in a future release** would invalidate previously-OK + in-flight orchestrations on their next turn. We do not plan to tighten + these caps after shipping; if we ever do, it is a major-version event + with explicit release notes. + +This is consistent with the established pattern in CHANGELOG.md (e.g., +0.1.18 "Provider Capability Filtering" — a feature gated and rolled out +the same way) and with the documented rolling-upgrade conventions in +`.github/copilot-instructions.md` ("flag any change that would break +mixed-version clusters" — flagged here, with operator-controlled gates +for each risk). + +## 11. Rollout + +A single release ships all the machinery; operators flip toggles when +their cluster is ready. A later release flips defaults. + +**Release N (the introducing release):** +- Add the three constants and `ConfigErrorKind::LimitExceeded` variant. +- Add `OrchestrationContext::history_size_bytes()` / + `history_pressure()`. +- Add `HistoryManager::total_history_bytes` counter. +- Add `RuntimeOptions::enforce_size_limits` (default `false`) and + `RuntimeOptions::emit_limit_exceeded_errors` (default `false`). +- Wire all tier-1 / tier-2 / tier-3 checks behind `enforce_size_limits`. +- Wire the `Application` ↔ `Configuration::LimitExceeded` shape choice + behind `emit_limit_exceeded_errors`. +- Add `duroxide.history.bytes` (always emitted, useful even with + enforcement off), `duroxide.payload.bytes` (always emitted), + `duroxide.history.terminated_oversize` (only fires when enforcement + on), `duroxide.limit.violations` (only fires when enforcement on), + 75 % / 90 % WARN logs. +- Update `docs/continue-as-new.md`, `docs/ORCHESTRATION-GUIDE.md`, + `TODO.md`. Document both toggles and the recommended rollout sequence. +- Land the test suite from §8 covering all four toggle combinations. +- Changelog entry under `### Added`. **No `### Breaking Changes` entry** + — defaults preserve all existing behavior. + +**Operator rollout sequence (recommended, single release):** + +1. Upgrade every cluster node to Release N. Toggles default off; nothing + changes behaviorally. +2. Observe `duroxide.history.bytes` and `duroxide.payload.bytes` metrics + for at least one full duty cycle. Identify any orchestrations + approaching `MAX_HISTORY_BYTES` or producing oversized payloads. +3. Refactor at-risk orchestrations to add `continue_as_new` checkpoints + using `history_pressure()`. +4. Flip `enforce_size_limits = true` cluster-wide (canary first if + desired). Errors still emit `Application`, so mixed-cluster + wire-format remains safe. +5. Once every node is on Release N (or later), flip + `emit_limit_exceeded_errors = true` cluster-wide. + +**Release N+K (some future minor version):** flip both defaults to +`true`. *That* release ships with a `### Breaking Changes` entry +covering (a) the new caps becoming enforced by default, and (b) the +migration of existing limit failures from `Application` to +`Configuration::LimitExceeded`. By then, the variant has been +deserializable for K releases. + +## 12. Decision Log + +- **Why are the cap *values* hardcoded?** Configurable values introduce a + per-instance pinning question (does an instance carry the cap that was + in effect when it started? what about `continue_as_new`?), a + mixed-cluster question (which cap wins?), and a validation matrix. + Forking three constants is cheaper than getting all of that right. +- **Why two on/off toggles, then?** The two rollout risks are independent + — the wire-format change (`emit_limit_exceeded_errors`) and the + in-flight enforcement change (`enforce_size_limits`). A single toggle + forces operators to take both at once or neither; two toggles let each + cluster pick its sequence. The toggle surface is two bools — much + smaller than a per-cap configuration story. +- **Why default both toggles to `false` in the introducing release?** A + duroxide upgrade should never silently fail in-flight orchestrations + or change wire format. Defaults preserve today's behavior; operators + opt in when ready. The defaults flip together in a later release whose + changelog entry calls out the breaking change. +- **Why allow the `(enforce=true, emit_le=false)` combination?** It lets + operators get the protective benefit of the new caps immediately on + upgrade while deferring the wire-format change until cluster rollout + is complete. Mixed-cluster safe by construction. Without this + combination, conservative operators would have to wait through a full + rolling upgrade *before* any size protection kicks in. +- **Why three constants and not one per call site?** Three covers every + meaningful tier (history aggregate, large per-event payload, small + per-event value) without inviting per-call-site bikeshedding. If a + real bug ever demands a tighter cap on a specific call site, splitting + one constant later is backward-compatible. +- **Why 5 / 3 / 64 KiB?** 5 MiB sits comfortably below every reference + provider's per-row aggregate cap. 3 MiB lets a single big payload through + but forces a `continue_as_new` decision before the second one. 64 KiB + matches the existing `MAX_KV_VALUE_BYTES` and is generous enough that + reasonable names/IDs are nowhere near the cap. +- **Why pre-persist history check?** Persisting a delta and then failing + the orchestration would bake the oversized event into history, making + replay even more expensive and the failure permanently visible. Dropping + the delta keeps history at the last known-good state plus one small + failure event. +- **Why `Configuration` and not `Application` (long-term)?** A hardcoded + limit is a deployment fact, not an application failure. `Configuration` + errors abort the turn at the replay-engine level (existing behavior), + which is what we want — application code should not retry-loop on a + non-retryable condition. The `Application` shape is the *transitional* + shape during rollout, controlled by `emit_limit_exceeded_errors`. +- **Why is `history_pressure()` orchestration-only?** It's deterministic + only inside replay (which is where `OrchestrationContext` lives). Inside + an activity it would be non-deterministic (activities aren't replayed) + and meaningless (activities can't `continue_as_new`). +- **Why no shape validation (control chars, reserved prefixes)?** Out of + scope; not motivated by a current bug; can be a separate proposal. diff --git a/src/client/mod.rs b/src/client/mod.rs index d24ebce..90d2f39 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use metrics::counter; use tracing::info; use crate::_typed_codec::{Codec, Json}; @@ -26,6 +27,16 @@ pub enum ClientError { /// Invalid input (client validation) InvalidInput { message: String }, + /// A hardcoded runtime size or shape limit was exceeded. + /// + /// `resource` is a short stable identifier of the offending site + /// (e.g. `"orchestration_input"`, `"name:event"`, `"identifier:instance_id"`). + /// `message` is human-readable. + /// + /// Only produced when [`Client::with_size_limits_enforced`] has been + /// called on this client (mirrors `RuntimeOptions::enforce_size_limits`). + Configuration { resource: String, message: String }, + /// Operation timed out Timeout, @@ -46,6 +57,7 @@ impl ClientError { ClientError::Provider(e) => e.is_retryable(), ClientError::ManagementNotAvailable => false, ClientError::InvalidInput { .. } => false, + ClientError::Configuration { .. } => false, ClientError::Timeout => true, ClientError::InstanceStillRunning { .. } => false, ClientError::CannotDeleteSubOrchestration { .. } => false, @@ -63,6 +75,9 @@ impl std::fmt::Display for ClientError { "Management features not available - provider doesn't implement ProviderAdmin" ), ClientError::InvalidInput { message } => write!(f, "Invalid input: {message}"), + ClientError::Configuration { resource, message } => { + write!(f, "Limit exceeded ({resource}): {message}") + } ClientError::Timeout => write!(f, "Operation timed out"), ClientError::InstanceStillRunning { instance_id } => write!( f, @@ -166,6 +181,12 @@ const POLL_DELAY_MULTIPLIER: u64 = 2; /// ``` pub struct Client { store: Arc, + /// When `true`, every input-accepting client method validates user-controlled + /// bytes against the size limits in `crate::runtime::limits` and returns + /// `ClientError::Configuration` on violation. Mirrors + /// `RuntimeOptions::enforce_size_limits` so an operator can flip both + /// sides of a deployment in lockstep. Defaults to `false`. + enforce_size_limits: bool, } impl Client { @@ -184,7 +205,50 @@ impl Client { /// let client2 = client.clone(); /// ``` pub fn new(store: Arc) -> Self { - Self { store } + Self { + store, + enforce_size_limits: false, + } + } + + /// Enable client-side size limit enforcement on this `Client`. + /// + /// When enabled, methods that accept user-controlled bytes + /// (e.g. `start_orchestration`, `raise_event`, `enqueue_event`, + /// `cancel_instance`) validate inputs against the size constants in + /// [`crate::runtime::limits`] **before** writing to the provider, and + /// return [`ClientError::Configuration`] on violation. Nothing reaches + /// history. + /// + /// Operators flipping `RuntimeOptions::enforce_size_limits = true` should + /// also flip this on every `Client` they hand out, so call sites get + /// the early, well-localized error instead of the runtime-side terminal + /// failure on the next orchestration turn. + pub fn with_size_limits_enforced(mut self) -> Self { + self.enforce_size_limits = true; + self + } + + /// Validate that `value` (named `field`, identified by `resource`) is + /// within `max_bytes`. Returns `Ok(())` immediately when enforcement + /// is disabled. + fn check_size(&self, resource: &str, field: &str, value: &str, max_bytes: usize) -> Result<(), ClientError> { + if !self.enforce_size_limits { + return Ok(()); + } + if value.len() > max_bytes { + // Emit tier-1 limit violation counter via the `metrics` facade. + counter!( + "duroxide_limit_violations_total", + "resource" => resource.to_string(), + ) + .increment(1); + return Err(ClientError::Configuration { + resource: resource.to_string(), + message: format!("{field} size ({} bytes) exceeds limit ({max_bytes} bytes)", value.len()), + }); + } + Ok(()) } /// Start an orchestration instance with string input. @@ -237,10 +301,31 @@ impl Client { orchestration: impl Into, input: impl Into, ) -> Result<(), ClientError> { + let instance = instance.into(); + let orchestration = orchestration.into(); + let input = input.into(); + self.check_size( + "identifier:instance_id", + "instance_id", + &instance, + crate::runtime::limits::MAX_SMALL_VALUE_BYTES, + )?; + self.check_size( + "name:orchestration", + "orchestration_name", + &orchestration, + crate::runtime::limits::MAX_SMALL_VALUE_BYTES, + )?; + self.check_size( + "orchestration_input", + "orchestration_input", + &input, + crate::runtime::limits::MAX_PAYLOAD_BYTES, + )?; let item = WorkItem::StartOrchestration { - instance: instance.into(), - orchestration: orchestration.into(), - input: input.into(), + instance, + orchestration, + input, version: None, parent_instance: None, parent_id: None, @@ -264,11 +349,33 @@ impl Client { version: impl Into, input: impl Into, ) -> Result<(), ClientError> { + let instance = instance.into(); + let orchestration = orchestration.into(); + let version = version.into(); + let input = input.into(); + self.check_size( + "identifier:instance_id", + "instance_id", + &instance, + crate::runtime::limits::MAX_SMALL_VALUE_BYTES, + )?; + self.check_size( + "name:orchestration", + "orchestration_name", + &orchestration, + crate::runtime::limits::MAX_SMALL_VALUE_BYTES, + )?; + self.check_size( + "orchestration_input", + "orchestration_input", + &input, + crate::runtime::limits::MAX_PAYLOAD_BYTES, + )?; let item = WorkItem::StartOrchestration { - instance: instance.into(), - orchestration: orchestration.into(), - input: input.into(), - version: Some(version.into()), + instance, + orchestration, + input, + version: Some(version), parent_instance: None, parent_id: None, execution_id: crate::INITIAL_EXECUTION_ID, @@ -378,11 +485,28 @@ impl Client { event_name: impl Into, data: impl Into, ) -> Result<(), ClientError> { - let item = WorkItem::ExternalRaised { - instance: instance.into(), - name: event_name.into(), - data: data.into(), - }; + let instance = instance.into(); + let name = event_name.into(); + let data = data.into(); + self.check_size( + "identifier:instance_id", + "instance_id", + &instance, + crate::runtime::limits::MAX_SMALL_VALUE_BYTES, + )?; + self.check_size( + "name:event", + "event_name", + &name, + crate::runtime::limits::MAX_SMALL_VALUE_BYTES, + )?; + self.check_size( + &format!("external_event:{name}"), + "event_payload", + &data, + crate::runtime::limits::MAX_SMALL_VALUE_BYTES, + )?; + let item = WorkItem::ExternalRaised { instance, name, data }; self.store .enqueue_for_orchestrator(item, None) .await @@ -423,11 +547,28 @@ impl Client { queue: impl Into, data: impl Into, ) -> Result<(), ClientError> { - let item = WorkItem::QueueMessage { - instance: instance.into(), - name: queue.into(), - data: data.into(), - }; + let instance = instance.into(); + let name = queue.into(); + let data = data.into(); + self.check_size( + "identifier:instance_id", + "instance_id", + &instance, + crate::runtime::limits::MAX_SMALL_VALUE_BYTES, + )?; + self.check_size( + "name:queue", + "queue_name", + &name, + crate::runtime::limits::MAX_SMALL_VALUE_BYTES, + )?; + self.check_size( + &format!("queue_message:{name}"), + "queue_message", + &data, + crate::runtime::limits::MAX_SMALL_VALUE_BYTES, + )?; + let item = WorkItem::QueueMessage { instance, name, data }; self.store .enqueue_for_orchestrator(item, None) .await @@ -562,10 +703,21 @@ impl Client { instance: impl Into, reason: impl Into, ) -> Result<(), ClientError> { - let item = WorkItem::CancelInstance { - instance: instance.into(), - reason: reason.into(), - }; + let instance = instance.into(); + let reason = reason.into(); + self.check_size( + "identifier:instance_id", + "instance_id", + &instance, + crate::runtime::limits::MAX_SMALL_VALUE_BYTES, + )?; + self.check_size( + "cancel_reason", + "cancel_reason", + &reason, + crate::runtime::limits::MAX_SMALL_VALUE_BYTES, + )?; + let item = WorkItem::CancelInstance { instance, reason }; self.store .enqueue_for_orchestrator(item, None) .await diff --git a/src/lib.rs b/src/lib.rs index 75417ff..5bca46c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -961,6 +961,15 @@ pub enum PoisonMessageType { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum ConfigErrorKind { Nondeterminism, + /// A hardcoded runtime size or shape limit was exceeded. + /// + /// The associated `resource` field on [`ErrorDetails::Configuration`] + /// identifies the offending site (e.g. `"history"`, + /// `"activity_input:MyActivity"`, `"name:event"`). The `message` field + /// is human-readable and not parsed by tooling. + /// + /// See `docs/proposals/size-limits.md` for the full taxonomy. + LimitExceeded, } /// Application error kinds. @@ -1008,6 +1017,10 @@ impl ErrorDetails { .as_ref() .map(|m| format!("nondeterministic: {m}")) .unwrap_or_else(|| format!("nondeterministic in {resource}")), + ConfigErrorKind::LimitExceeded => message + .as_ref() + .map(|m| format!("limit exceeded ({resource}): {m}")) + .unwrap_or_else(|| format!("limit exceeded ({resource})")), }, ErrorDetails::Application { kind, message, .. } => match kind { AppErrorKind::Cancelled { reason } => format!("canceled: {reason}"), @@ -1662,6 +1675,16 @@ struct CtxInner { /// Keys written during the current turn are marked with `u64::MAX` (never prunable /// until the next turn, when their real timestamp will appear in the snapshot). kv_metadata: std::collections::HashMap, + + /// Total serialized bytes of the persisted history at the start of this turn. + /// + /// Surfaced via [`OrchestrationContext::history_size_bytes`] so orchestration + /// code can deterministically anticipate `MAX_HISTORY_BYTES` and roll over + /// with `continue_as_new()` before hitting the cap. + /// + /// Stable for the duration of a turn (excludes events emitted by this turn). + /// Seeded by the replay engine before invoking the orchestration handler. + history_size_bytes_at_turn_start: usize, } impl CtxInner { @@ -1713,6 +1736,7 @@ impl CtxInner { kv_state: std::collections::HashMap::new(), kv_metadata: std::collections::HashMap::new(), + history_size_bytes_at_turn_start: 0, } } @@ -2408,6 +2432,51 @@ impl OrchestrationContext { self.inner.lock().unwrap().is_replaying = is_replaying; } + /// Total serialized bytes of this execution's history at the start of the + /// current turn. + /// + /// Excludes events emitted by the in-flight turn, so the value is stable + /// for the duration of `await` resumptions and identical across replays + /// of the same turn. Use it together with [`MAX_HISTORY_BYTES`] (or the + /// convenience [`history_pressure`](Self::history_pressure)) to decide + /// when to roll over with `continue_as_new()` before the runtime fails + /// the orchestration with `LimitExceeded { resource: "history" }`. + /// + /// [`MAX_HISTORY_BYTES`]: crate::runtime::limits::MAX_HISTORY_BYTES + /// + /// # Example + /// + /// ```rust,no_run + /// # use duroxide::OrchestrationContext; + /// # async fn example(ctx: OrchestrationContext) -> Result { + /// if ctx.history_pressure() > 0.75 { + /// return ctx.continue_as_new(String::new()).await; + /// } + /// # Ok(String::new()) + /// # } + /// ``` + pub fn history_size_bytes(&self) -> usize { + self.inner.lock().unwrap().history_size_bytes_at_turn_start + } + + /// Ratio of [`history_size_bytes`](Self::history_size_bytes) to + /// [`MAX_HISTORY_BYTES`](crate::runtime::limits::MAX_HISTORY_BYTES), + /// clamped to `0.0..=1.0`. + /// + /// Convenience wrapper for the common "am I close to the cap?" check. + pub fn history_pressure(&self) -> f32 { + let bytes = self.history_size_bytes() as f32; + let cap = crate::runtime::limits::MAX_HISTORY_BYTES as f32; + (bytes / cap).clamp(0.0, 1.0) + } + + /// Seed the turn-start history size (called by the replay engine before + /// invoking the orchestration handler). + #[doc(hidden)] + pub fn set_history_size_bytes(&self, bytes: usize) { + self.inner.lock().unwrap().history_size_bytes_at_turn_start = bytes; + } + /// Bind an external subscription to a schedule_id (used by replay engine and test harness). #[doc(hidden)] pub fn bind_external_subscription(&self, schedule_id: u64, name: &str) { diff --git a/src/runtime/dispatchers/orchestration.rs b/src/runtime/dispatchers/orchestration.rs index 72488ab..751e5cc 100644 --- a/src/runtime/dispatchers/orchestration.rs +++ b/src/runtime/dispatchers/orchestration.rs @@ -13,6 +13,7 @@ use crate::providers::{ExecutionMetadata, ProviderError, ScheduledActivityIdentifier, WorkItem}; use crate::{Event, EventKind}; +use metrics::counter; use std::sync::Arc; use std::sync::atomic::Ordering; use std::time::Duration; @@ -29,6 +30,11 @@ use super::super::{HistoryManager, Runtime, WorkItemReader}; /// - KV value size limit ([`crate::runtime::limits::MAX_KV_VALUE_BYTES`]) /// - KV key count limit ([`crate::runtime::limits::MAX_KV_KEYS`]) /// +/// `emit_limit_exceeded_errors` selects the error shape produced for any +/// violation: when `true`, failures use `Configuration::LimitExceeded`; +/// when `false`, failures preserve today's `Application::OrchestrationFailed` +/// shape (mixed-cluster-safe during a rolling upgrade). +/// /// Returns `true` if a limit was violated (orchestration marked as failed). #[allow(clippy::too_many_arguments)] fn validate_limits( @@ -40,6 +46,8 @@ fn validate_limits( instance: &str, execution_id: u64, kv_snapshot: &std::collections::HashMap, + emit_limit_exceeded_errors: bool, + enforce_size_limits: bool, ) -> bool { // --- Custom status size --- let last_custom_status = history_delta.iter().rev().find_map(|e| { @@ -62,6 +70,7 @@ fn validate_limits( "Custom status exceeds size limit, failing orchestration" ); fail_orchestration_for_limit( + "custom_status", format!( "Custom status size ({} bytes) exceeds limit ({} bytes)", s.len(), @@ -73,6 +82,8 @@ fn validate_limits( orchestrator_items, instance, execution_id, + emit_limit_exceeded_errors, + false, // pre-existing limit: keep delta for backward compatibility ); return true; } @@ -106,6 +117,7 @@ fn validate_limits( "Activity tag exceeds size limit, failing orchestration" ); fail_orchestration_for_limit( + &format!("tag:{}", activity_name), format!( "Activity '{}' tag size ({} bytes) exceeds limit ({} bytes)", activity_name, @@ -118,6 +130,8 @@ fn validate_limits( orchestrator_items, instance, execution_id, + emit_limit_exceeded_errors, + false, // pre-existing limit: keep delta for backward compatibility ); return true; } @@ -146,6 +160,7 @@ fn validate_limits( "KV value exceeds size limit, failing orchestration" ); fail_orchestration_for_limit( + &format!("kv_value:{}", key), format!( "KV value for key '{}' ({} bytes) exceeds limit ({} bytes)", key, @@ -158,6 +173,8 @@ fn validate_limits( orchestrator_items, instance, execution_id, + emit_limit_exceeded_errors, + false, // pre-existing limit: keep delta for backward compatibility ); return true; } @@ -192,6 +209,7 @@ fn validate_limits( "KV key count exceeds limit, failing orchestration" ); fail_orchestration_for_limit( + "kv_keys", format!( "KV key count ({}) exceeds limit ({})", user_key_count, @@ -203,6 +221,219 @@ fn validate_limits( orchestrator_items, instance, execution_id, + emit_limit_exceeded_errors, + false, // pre-existing limit: keep delta for backward compatibility + ); + return true; + } + + // --------------------------------------------------------------------- + // Tier-2 size limits (docs/proposals/size-limits.md § 5.1). + // + // Gated by `enforce_size_limits`: when off the runtime measures but + // does not fail. When on we walk the freshly produced delta and reject + // any oversized payload, name, identifier, or error string. We also + // run the aggregate `MAX_HISTORY_BYTES` check before persistence so + // an oversized delta never reaches the provider. + // --------------------------------------------------------------------- + if enforce_size_limits + && check_tier2_size_limits( + history_delta, + history_mgr, + metadata, + worker_items, + orchestrator_items, + instance, + execution_id, + emit_limit_exceeded_errors, + ) + { + return true; + } + + false +} + +/// Per-event tier-2 size checks plus the aggregate history-cap check. +/// +/// Returns `true` when a limit was violated and the orchestration was +/// failed. Side effects on violation: drop the in-memory delta, append a +/// single terminal `OrchestrationFailed` event, clear pending worker and +/// orchestrator items. +#[allow(clippy::too_many_arguments)] +fn check_tier2_size_limits( + history_delta: &[Event], + history_mgr: &mut HistoryManager, + metadata: &mut ExecutionMetadata, + worker_items: &mut Vec, + orchestrator_items: &mut Vec, + instance: &str, + execution_id: u64, + emit_limit_exceeded_errors: bool, +) -> bool { + use crate::runtime::limits::{MAX_PAYLOAD_BYTES, MAX_SMALL_VALUE_BYTES}; + + // Helper: trip a tier-2 failure with delta-drop semantics. + let trip = |resource: String, message: String, hm: &mut HistoryManager, md: &mut ExecutionMetadata, wi: &mut Vec, oi: &mut Vec| { + tracing::error!( + target: "duroxide::runtime", + instance_id = %instance, + execution_id = %execution_id, + resource = %resource, + message = %message, + "Tier-2 size limit exceeded, failing orchestration" + ); + fail_orchestration_for_limit( + &resource, + message, + hm, + md, + wi, + oi, + instance, + execution_id, + emit_limit_exceeded_errors, + true, // tier-2: drop delta per spec § 5.1 + ); + }; + + for event in history_delta { + match &event.kind { + EventKind::ActivityScheduled { + name, + input, + session_id, + .. + } => { + if name.len() > MAX_SMALL_VALUE_BYTES { + trip( + "name:activity".into(), + format!("Activity name size ({} bytes) exceeds limit ({MAX_SMALL_VALUE_BYTES} bytes)", name.len()), + history_mgr, metadata, worker_items, orchestrator_items, + ); + return true; + } + if input.len() > MAX_PAYLOAD_BYTES { + trip( + format!("activity_input:{name}"), + format!("Activity '{name}' input size ({} bytes) exceeds limit ({MAX_PAYLOAD_BYTES} bytes)", input.len()), + history_mgr, metadata, worker_items, orchestrator_items, + ); + return true; + } + if let Some(sid) = session_id + && sid.len() > MAX_SMALL_VALUE_BYTES + { + trip( + "name:session".into(), + format!("Activity '{name}' session_id size ({} bytes) exceeds limit ({MAX_SMALL_VALUE_BYTES} bytes)", sid.len()), + history_mgr, metadata, worker_items, orchestrator_items, + ); + return true; + } + } + EventKind::SubOrchestrationScheduled { name, instance: _, input } => { + if name.len() > MAX_SMALL_VALUE_BYTES { + trip( + "name:sub_orchestration".into(), + format!("Sub-orchestration name size ({} bytes) exceeds limit ({MAX_SMALL_VALUE_BYTES} bytes)", name.len()), + history_mgr, metadata, worker_items, orchestrator_items, + ); + return true; + } + if input.len() > MAX_PAYLOAD_BYTES { + trip( + format!("sub_orch_input:{name}"), + format!("Sub-orchestration '{name}' input size ({} bytes) exceeds limit ({MAX_PAYLOAD_BYTES} bytes)", input.len()), + history_mgr, metadata, worker_items, orchestrator_items, + ); + return true; + } + } + EventKind::OrchestrationChained { name, input, .. } => { + if name.len() > MAX_SMALL_VALUE_BYTES { + trip( + "name:orchestration".into(), + format!("Detached orchestration name size ({} bytes) exceeds limit ({MAX_SMALL_VALUE_BYTES} bytes)", name.len()), + history_mgr, metadata, worker_items, orchestrator_items, + ); + return true; + } + if input.len() > MAX_PAYLOAD_BYTES { + trip( + format!("orchestration_input:{name}"), + format!("Detached orchestration '{name}' input size ({} bytes) exceeds limit ({MAX_PAYLOAD_BYTES} bytes)", input.len()), + history_mgr, metadata, worker_items, orchestrator_items, + ); + return true; + } + } + EventKind::OrchestrationCompleted { output } => { + if output.len() > MAX_PAYLOAD_BYTES { + trip( + "orchestration_output".into(), + format!("Orchestration output size ({} bytes) exceeds limit ({MAX_PAYLOAD_BYTES} bytes)", output.len()), + history_mgr, metadata, worker_items, orchestrator_items, + ); + return true; + } + } + EventKind::SubOrchestrationCompleted { result } => { + if result.len() > MAX_PAYLOAD_BYTES { + trip( + "sub_orch_output".into(), + format!("Sub-orchestration result size ({} bytes) exceeds limit ({MAX_PAYLOAD_BYTES} bytes)", result.len()), + history_mgr, metadata, worker_items, orchestrator_items, + ); + return true; + } + } + EventKind::OrchestrationContinuedAsNew { input } => { + if input.len() > MAX_PAYLOAD_BYTES { + trip( + "continue_as_new_input".into(), + format!("continue_as_new input size ({} bytes) exceeds limit ({MAX_PAYLOAD_BYTES} bytes)", input.len()), + history_mgr, metadata, worker_items, orchestrator_items, + ); + return true; + } + } + EventKind::OrchestrationFailed { details } + | EventKind::SubOrchestrationFailed { details } + | EventKind::ActivityFailed { details } => { + let msg = details.display_message(); + if msg.len() > MAX_SMALL_VALUE_BYTES { + trip( + "error_message".into(), + format!("Error message size ({} bytes) exceeds limit ({MAX_SMALL_VALUE_BYTES} bytes)", msg.len()), + history_mgr, metadata, worker_items, orchestrator_items, + ); + return true; + } + } + _ => {} + } + } + + // Aggregate history-cap check (docs/proposals/size-limits.md § 5.2). + // + // The pre-persist check uses the running counters maintained by + // HistoryManager: if `baseline_bytes + delta_bytes` would exceed + // MAX_HISTORY_BYTES we drop the delta and emit a single small + // terminal failure event. The minimal delta is well under the cap, so + // appending it cannot itself exceed the limit. + let proposed = history_mgr.proposed_total_bytes() as usize; + if proposed > crate::runtime::limits::MAX_HISTORY_BYTES { + trip( + "history".into(), + format!( + "Total history size ({proposed} bytes) would exceed limit ({} bytes)", + crate::runtime::limits::MAX_HISTORY_BYTES, + ), + history_mgr, + metadata, + worker_items, + orchestrator_items, ); return true; } @@ -211,7 +442,21 @@ fn validate_limits( } /// Shared helper to fail an orchestration due to a limit violation. +/// +/// `resource` is a short stable identifier of the offending site +/// (e.g. `"custom_status"`, `"history"`, `"kv_value:foo"`). It is used as +/// the `resource` field on `Configuration::LimitExceeded` when +/// `emit_limit_exceeded_errors` is true; otherwise the `Application` shape +/// is preserved (today's behavior, mixed-cluster-safe). +/// +/// When `drop_delta` is `true`, the existing history delta is discarded +/// before the failure event is appended (per `docs/proposals/size-limits.md` +/// § 5.1 tier-2: rejected deltas never reach the provider). Pre-existing +/// limit checks pass `false` to preserve their current behavior of leaving +/// the offending event in the delta alongside the failure. +#[allow(clippy::too_many_arguments)] fn fail_orchestration_for_limit( + resource: &str, message: String, history_mgr: &mut HistoryManager, metadata: &mut ExecutionMetadata, @@ -219,13 +464,42 @@ fn fail_orchestration_for_limit( orchestrator_items: &mut Vec, instance: &str, execution_id: u64, + emit_limit_exceeded_errors: bool, + drop_delta: bool, ) { - let error = crate::ErrorDetails::Application { - kind: crate::AppErrorKind::OrchestrationFailed, - message, - retryable: false, + let error = if emit_limit_exceeded_errors { + crate::ErrorDetails::Configuration { + kind: crate::ConfigErrorKind::LimitExceeded, + resource: resource.to_string(), + message: Some(message), + } + } else { + crate::ErrorDetails::Application { + kind: crate::AppErrorKind::OrchestrationFailed, + message, + retryable: false, + } }; + if drop_delta { + history_mgr.clear_delta(); + // The dropped delta may have included an OrchestrationStarted event that + // caused compute_execution_metadata to set pinned_duroxide_version. Since + // OrchestrationStarted is no longer in the delta, we must clear the pinned + // version to avoid the ack invariant assertion in + // ack_orchestration_with_changes(). + metadata.pinned_duroxide_version = None; + } + + // Always emit the limit-violations counter regardless of the error + // shape selected above. Uses the `metrics` facade so no MetricsProvider + // reference is needed here. + counter!( + "duroxide_limit_violations_total", + "resource" => resource.to_string(), + ) + .increment(1); + let failed_event = Event::with_event_id( history_mgr.next_event_id(), instance, @@ -723,8 +997,82 @@ impl Runtime { instance, execution_id_for_ack, &item.kv_snapshot, + self.options.emit_limit_exceeded_errors, + self.options.enforce_size_limits, ); + // ── Slice-8 metrics ─────────────────────────────────────────────── + // Always-on: payload bytes histogram and history bytes gauge. These + // run regardless of `enforce_size_limits` so operators can observe + // pressure before turning enforcement on. + { + let orch_name_for_metrics = if workitem_reader.has_orchestration_name() { + workitem_reader.orchestration_name.as_str() + } else { + "unknown" + }; + + // Per-payload histograms (always emitted). + self.record_payload_bytes_from_delta(&history_delta_snapshot); + + // History bytes gauge (always emitted). + let proposed_bytes = history_mgr.proposed_total_bytes(); + self.record_history_bytes(orch_name_for_metrics, proposed_bytes); + + // history_terminated_oversize counter — emitted when the last + // event appended to delta is a limit-failure with resource="history". + // We detect this by checking the delta after validate_limits ran. + if let Some(last) = history_mgr.delta().last() { + if let crate::EventKind::OrchestrationFailed { details } = &last.kind { + let is_history = match details { + crate::ErrorDetails::Configuration { + kind: crate::ConfigErrorKind::LimitExceeded, + resource, + .. + } if resource == "history" => true, + // When emit_limit_exceeded_errors=false, history violations + // produce an Application error. Detect by checking the message. + crate::ErrorDetails::Application { message, .. } + if message.contains("Total history size") => + { + true + } + _ => false, + }; + if is_history { + self.record_limit_violation("history", orch_name_for_metrics); + } + } + } + + // 75 % / 90 % WARN thresholds — fire when the baseline crosses the + // threshold this turn (once per threshold per execution). + let max = crate::runtime::limits::MAX_HISTORY_BYTES as u64; + let baseline = history_mgr.baseline_history_bytes(); + let threshold_75 = max * 3 / 4; + let threshold_90 = max * 9 / 10; + if baseline < threshold_75 && proposed_bytes >= threshold_75 { + tracing::warn!( + target: "duroxide::runtime", + instance_id = %instance, + orchestration_name = %orch_name_for_metrics, + total_history_bytes = %proposed_bytes, + limit_bytes = %max, + "Orchestration history at 75% of MAX_HISTORY_BYTES; consider continue_as_new()" + ); + } else if baseline < threshold_90 && proposed_bytes >= threshold_90 { + tracing::warn!( + target: "duroxide::runtime", + instance_id = %instance, + orchestration_name = %orch_name_for_metrics, + total_history_bytes = %proposed_bytes, + limit_bytes = %max, + "Orchestration history at 90% of MAX_HISTORY_BYTES; approaching the history cap" + ); + } + } + // ── end slice-8 metrics ─────────────────────────────────────────── + // Calculate metrics let duration_seconds = start_time.elapsed().as_secs_f64(); turn_count += 1; // At least one turn was processed diff --git a/src/runtime/dispatchers/worker.rs b/src/runtime/dispatchers/worker.rs index 157729d..8ef91e5 100644 --- a/src/runtime/dispatchers/worker.rs +++ b/src/runtime/dispatchers/worker.rs @@ -29,6 +29,7 @@ //! 4. If the activity doesn't complete, it's aborted and the work item is dropped use crate::providers::WorkItem; +use metrics::counter; use std::sync::Arc; use std::sync::atomic::Ordering; use std::time::Duration; @@ -616,6 +617,87 @@ async fn handle_activity_success( let duration_ms = start_time.elapsed().as_millis() as u64; let duration_seconds = duration_ms as f64 / 1000.0; + // Tier-3 size check (docs/proposals/size-limits.md § 5.1). + // + // When enforcement is on, drop oversized outputs before they reach the + // provider and synthesize an `ActivityFailed` instead. The shape of the + // failure obeys `emit_limit_exceeded_errors` so mixed-cluster wire + // formats stay consistent with tier-1/tier-2. + // + // Forensic logging records the size and a 16-hex-char BLAKE3 prefix + // (never the raw payload) so operators can correlate complaints from + // upstream systems without leaking the offending bytes. + if rt.options.enforce_size_limits + && result.len() > crate::runtime::limits::MAX_PAYLOAD_BYTES + { + let hash = blake3::hash(result.as_bytes()); + let hash_hex: String = hash.to_hex().chars().take(16).collect(); + tracing::warn!( + target: "duroxide::runtime", + instance_id = %ctx.instance, + execution_id = %ctx.execution_id, + activity_name = %ctx.activity_name, + activity_id = %ctx.activity_id, + worker_id = %ctx.worker_id, + output_size_bytes = %result.len(), + output_blake3_hex = %hash_hex, + limit_bytes = %crate::runtime::limits::MAX_PAYLOAD_BYTES, + "Activity output exceeds size limit; dropping payload and failing activity" + ); + + let message = format!( + "Activity '{}' output size ({} bytes) exceeds limit ({} bytes)", + ctx.activity_name, + result.len(), + crate::runtime::limits::MAX_PAYLOAD_BYTES, + ); + let details = if rt.options.emit_limit_exceeded_errors { + crate::ErrorDetails::Configuration { + kind: crate::ConfigErrorKind::LimitExceeded, + resource: format!("activity_output:{}", ctx.activity_name), + message: Some(message), + } + } else { + crate::ErrorDetails::Application { + kind: crate::AppErrorKind::ActivityFailed, + message, + retryable: false, + } + }; + // Drop the oversized payload before crossing the await boundary. + drop(result); + + // Emit limit-violations counter (tier-3, metrics facade). + counter!( + "duroxide_limit_violations_total", + "resource" => format!("activity_output:{}", ctx.activity_name), + ) + .increment(1); + + rt.record_activity_execution( + &ctx.activity_name, + "limit_exceeded", + duration_seconds, + 0, + ctx.tag.as_deref(), + ); + + let ack_result = rt + .history_store + .ack_work_item( + &ctx.lock_token, + Some(WorkItem::ActivityFailed { + instance: ctx.instance.clone(), + execution_id: ctx.execution_id, + id: ctx.activity_id, + details, + }), + ) + .await; + + return (ack_result, ActivityOutcome::AppError); + } + tracing::debug!( target: "duroxide::runtime", instance_id = %ctx.instance, @@ -658,6 +740,41 @@ async fn handle_activity_error( let duration_ms = start_time.elapsed().as_millis() as u64; let duration_seconds = duration_ms as f64 / 1000.0; + // Tier-3 error string check (docs/proposals/size-limits.md § 5.1). + // + // Per spec, the error string is **truncated** rather than failed: + // failing-on-failure obscures the original cause. Truncation preserves + // the information that an activity failed while bounding the bytes + // we persist into history. + let error = if rt.options.enforce_size_limits + && error.len() > crate::runtime::limits::MAX_SMALL_VALUE_BYTES + { + let original_len = error.len(); + let limit = crate::runtime::limits::MAX_SMALL_VALUE_BYTES; + // Truncate at a UTF-8 boundary at or below `limit`. + let mut cut = limit; + while cut > 0 && !error.is_char_boundary(cut) { + cut -= 1; + } + let mut truncated = String::with_capacity(cut + 32); + truncated.push_str(&error[..cut]); + truncated.push_str("... [truncated]"); + tracing::warn!( + target: "duroxide::runtime", + instance_id = %ctx.instance, + execution_id = %ctx.execution_id, + activity_name = %ctx.activity_name, + activity_id = %ctx.activity_id, + worker_id = %ctx.worker_id, + error_size_bytes = %original_len, + limit_bytes = %limit, + "Activity error message exceeds size limit; truncating" + ); + truncated + } else { + error + }; + tracing::warn!( target: "duroxide::runtime", instance_id = %ctx.instance, diff --git a/src/runtime/limits.rs b/src/runtime/limits.rs index 4faf121..b4d2e7b 100644 --- a/src/runtime/limits.rs +++ b/src/runtime/limits.rs @@ -43,3 +43,57 @@ pub const MAX_KV_KEYS: usize = 150; /// /// Enforced in `validate_limits()` by scanning `KeyValueSet` events in the history delta. pub const MAX_KV_VALUE_BYTES: usize = 64 * 1024; + +// ============================================================================= +// Size limits — see `docs/proposals/size-limits.md` +// +// These three constants cover every value-bearing call site not already +// governed by an existing constant above. Enforcement and error shape are +// gated by `RuntimeOptions::enforce_size_limits` and +// `RuntimeOptions::emit_limit_exceeded_errors` respectively (defaults `false` +// in the introducing release; defaults flip in a later release). +// ============================================================================= + +/// Maximum total serialized size of an execution's history, in bytes. +/// +/// When `RuntimeOptions::enforce_size_limits` is true, the orchestration +/// dispatcher checks before each ack whether appending the proposed +/// `history_delta` would push `total_history_bytes` past this cap. If it +/// would, the delta is **discarded in memory** and the orchestration is +/// failed with a single terminal `OrchestrationFailed` event whose +/// `details.resource = "history"`. The oversized delta never reaches the +/// provider. +/// +/// Orchestration code can read the running total via +/// `OrchestrationContext::history_size_bytes()` and roll over with +/// `continue_as_new()` before hitting this cap. +/// +/// 5 MiB — comfortably below every reference provider's per-row aggregate +/// limits, and large enough that typical workflows never approach it. +pub const MAX_HISTORY_BYTES: usize = 5 * 1024 * 1024; + +/// Maximum size of a "large payload" — the values that flow through +/// orchestration logic and live as a single event in history. +/// +/// Covers: activity input/output, orchestration input/output, +/// sub-orchestration input/output, `continue_as_new` carry-forward input. +/// +/// 3 MiB — over half of `MAX_HISTORY_BYTES` so a single big payload is +/// permitted, but two of them in the same execution will trip the history +/// cap and force a `continue_as_new` decision. +pub const MAX_PAYLOAD_BYTES: usize = 3 * 1024 * 1024; + +/// Maximum size for "small values" — short strings and discrete signals +/// that are not currently capped by an existing constant. +/// +/// Covers: orchestration name, activity name, sub-orchestration name, +/// event name, queue name, instance ID, session ID, external event payload, +/// queue message, error message, cancel reason. +/// +/// Does **not** apply to values already governed by an existing constant +/// (`MAX_CUSTOM_STATUS_BYTES`, `MAX_KV_VALUE_BYTES`, `MAX_TAG_NAME_BYTES`). +/// +/// 64 KiB — same order as `MAX_KV_VALUE_BYTES`, generous enough that +/// reasonable names/IDs (bytes-to-hundreds-of-bytes) are never near the cap, +/// and large enough for stack traces and moderate event payloads. +pub const MAX_SMALL_VALUE_BYTES: usize = 64 * 1024; diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index fef0b09..c9e4ce8 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -342,6 +342,44 @@ pub struct RuntimeOptions { /// /// Default: `TagFilter::DefaultOnly` (untagged activities only) pub worker_tag_filter: crate::providers::TagFilter, + + // ========================================================================= + // Size limits — see `docs/proposals/size-limits.md` + // ========================================================================= + + /// When `true`, the runtime enforces [`crate::runtime::limits::MAX_HISTORY_BYTES`], + /// [`crate::runtime::limits::MAX_PAYLOAD_BYTES`], and + /// [`crate::runtime::limits::MAX_SMALL_VALUE_BYTES`] at all three tiers + /// (Client API, orchestration turn, worker output). + /// + /// When `false` (default), the runtime **measures** every value-bearing + /// call site and updates metrics (`duroxide.payload.bytes`, + /// `duroxide.history.bytes`) but never fails for a new-cap violation. + /// Use the off state to observe population pressure (via + /// `duroxide.history.bytes` and `OrchestrationContext::history_size_bytes()`) + /// and refactor at-risk orchestrations *before* turning enforcement on. + /// + /// Independent of `emit_limit_exceeded_errors`. + /// + /// Default: `false` + pub enforce_size_limits: bool, + + /// When `true`, every limit failure (both pre-existing and new) emits + /// `ErrorDetails::Configuration { kind: ConfigErrorKind::LimitExceeded, + /// resource, message }`. + /// + /// When `false` (default), every limit failure emits + /// `ErrorDetails::Application { kind: OrchestrationFailed, retryable: false, + /// message }` — the shape pre-existing limits use today, recognizable to + /// every prior duroxide version. Leave off during a rolling upgrade until + /// every node in the cluster has been upgraded to a duroxide version that + /// recognizes the `LimitExceeded` variant; otherwise older nodes hit the + /// existing `FailedDeserialization` poison path on the unknown variant. + /// + /// Independent of `enforce_size_limits`. + /// + /// Default: `false` + pub emit_limit_exceeded_errors: bool, } impl Default for RuntimeOptions { @@ -367,6 +405,8 @@ impl Default for RuntimeOptions { max_sessions_per_runtime: 10, worker_node_id: None, worker_tag_filter: crate::providers::TagFilter::default(), + enforce_size_limits: false, + emit_limit_exceeded_errors: false, } } } @@ -658,6 +698,63 @@ impl Runtime { } } + /// Emit `duroxide_limit_violations_total { resource }` and, when the + /// resource is `"history"`, also `duroxide_history_terminated_oversize_total`. + #[inline] + pub(crate) fn record_limit_violation(&self, resource: &str, orchestration_name: &str) { + if let Some(provider) = self.metrics_provider() { + provider.record_limit_violation(resource); + if resource == "history" { + provider.record_history_terminated_oversize(orchestration_name); + } + } + } + + /// Emit `duroxide_history_bytes { orchestration_name }` after a committed turn. + /// Always emitted independent of `enforce_size_limits`. + #[inline] + pub(crate) fn record_history_bytes(&self, orchestration_name: &str, bytes: u64) { + if let Some(provider) = self.metrics_provider() { + provider.record_history_bytes(orchestration_name, bytes); + } + } + + /// Emit `duroxide_payload_bytes { kind }` for each payload-bearing event in a + /// freshly-produced history delta. Always emitted independent of `enforce_size_limits`. + pub(crate) fn record_payload_bytes_from_delta(&self, delta: &[crate::Event]) { + let provider = match self.metrics_provider() { + Some(p) => p, + None => return, + }; + use crate::EventKind; + for event in delta { + match &event.kind { + EventKind::OrchestrationStarted { input, .. } => { + provider.record_payload_bytes("orchestration_input", input.len()); + } + EventKind::ActivityScheduled { name, input, .. } => { + provider.record_payload_bytes(&format!("activity_input:{name}"), input.len()); + } + EventKind::ActivityCompleted { result, .. } => { + provider.record_payload_bytes("activity_output", result.len()); + } + EventKind::SubOrchestrationScheduled { name, input, .. } => { + provider.record_payload_bytes(&format!("sub_orch_input:{name}"), input.len()); + } + EventKind::SubOrchestrationCompleted { result, .. } => { + provider.record_payload_bytes("sub_orch_output", result.len()); + } + EventKind::OrchestrationCompleted { output } => { + provider.record_payload_bytes("orchestration_output", output.len()); + } + EventKind::OrchestrationContinuedAsNew { input, .. } => { + provider.record_payload_bytes("continue_as_new_input", input.len()); + } + _ => {} + } + } + } + pub fn metrics_snapshot(&self) -> Option { self.observability_handle .as_ref() diff --git a/src/runtime/observability.rs b/src/runtime/observability.rs index 090ae34..7d9a6e6 100644 --- a/src/runtime/observability.rs +++ b/src/runtime/observability.rs @@ -135,6 +135,12 @@ pub struct MetricsSnapshot { pub orch_continue_as_new: u64, pub suborchestration_calls: u64, pub provider_errors: u64, + /// Total number of size-limit violations recorded across all tiers + /// (tier-1 client, tier-2 orchestration turn, tier-3 worker output). + pub limit_violations: u64, + /// Number of orchestrations terminated because their history exceeded + /// `MAX_HISTORY_BYTES`. + pub history_terminated_oversize: u64, } /// Metrics provider using the `metrics` facade crate. @@ -172,6 +178,10 @@ pub struct MetricsProvider { // Active orchestrations tracking (for gauge metrics) active_orchestrations_atomic: Arc, + + // Size-limit metrics + limit_violations_atomic: AtomicU64, + history_terminated_oversize_atomic: AtomicU64, } impl MetricsProvider { @@ -202,6 +212,8 @@ impl MetricsProvider { orch_queue_depth_atomic: Arc::new(AtomicU64::new(0)), worker_queue_depth_atomic: Arc::new(AtomicU64::new(0)), active_orchestrations_atomic: Arc::new(AtomicI64::new(0)), + limit_violations_atomic: AtomicU64::new(0), + history_terminated_oversize_atomic: AtomicU64::new(0), }) } @@ -627,8 +639,73 @@ impl MetricsProvider { orch_continue_as_new: self.orch_continue_as_new_atomic.load(Ordering::Relaxed), suborchestration_calls: self.suborchestration_calls_atomic.load(Ordering::Relaxed), provider_errors: self.provider_errors_atomic.load(Ordering::Relaxed), + limit_violations: self.limit_violations_atomic.load(Ordering::Relaxed), + history_terminated_oversize: self.history_terminated_oversize_atomic.load(Ordering::Relaxed), } } + + // ======================================================================== + // Size-limit metrics + // ======================================================================== + + /// Record a size-limit violation at any tier (1, 2, or 3). + /// + /// Emits `duroxide_limit_violations_total { resource }` via the `metrics` + /// facade. `resource` is the stable identifier from + /// `docs/proposals/size-limits.md` (e.g. `"history"`, + /// `"activity_input:MyActivity"`). + #[inline] + pub fn record_limit_violation(&self, resource: &str) { + counter!( + "duroxide_limit_violations_total", + "resource" => resource.to_string(), + ) + .increment(1); + self.limit_violations_atomic.fetch_add(1, Ordering::Relaxed); + } + + /// Record a history-oversize termination for `orchestration_name`. + /// + /// Emits `duroxide_history_terminated_oversize_total { orchestration_name }`. + /// Called alongside [`record_limit_violation`] when `resource == "history"`. + #[inline] + pub fn record_history_terminated_oversize(&self, orchestration_name: &str) { + counter!( + "duroxide_history_terminated_oversize_total", + "orchestration_name" => orchestration_name.to_string(), + ) + .increment(1); + self.history_terminated_oversize_atomic.fetch_add(1, Ordering::Relaxed); + } + + /// Record the serialized history size for an orchestration after a committed turn. + /// + /// Emits `duroxide_history_bytes { orchestration_name }` as a histogram so + /// operators can observe population pressure across all running instances. + /// Always called — independent of `enforce_size_limits`. + #[inline] + pub fn record_history_bytes(&self, orchestration_name: &str, bytes: u64) { + histogram!( + "duroxide_history_bytes", + "orchestration_name" => orchestration_name.to_string(), + ) + .record(bytes as f64); + } + + /// Record the byte size of a single payload value at a named call site. + /// + /// Emits `duroxide_payload_bytes { kind }` as a histogram. `kind` is the + /// resource identifier string from `docs/proposals/size-limits.md` (e.g. + /// `"activity_input:FetchBatch"`, `"orchestration_input"`). + /// Always called — independent of `enforce_size_limits`. + #[inline] + pub fn record_payload_bytes(&self, kind: &str, bytes: usize) { + histogram!( + "duroxide_payload_bytes", + "kind" => kind.to_string(), + ) + .record(bytes as f64); + } } /// Initialize logging subsystem diff --git a/src/runtime/replay_engine.rs b/src/runtime/replay_engine.rs index 4b40b82..c02c132 100644 --- a/src/runtime/replay_engine.rs +++ b/src/runtime/replay_engine.rs @@ -552,6 +552,16 @@ impl ReplayEngine { } } + // Seed history_size_bytes for OrchestrationContext::history_size_bytes(). + // We expose the size of working_history (baseline + completion-derived + // events that exist before user code runs), which is stable for the + // duration of the turn and identical across replays of the same turn. + let history_size_bytes: usize = working_history + .iter() + .map(|e| crate::runtime::state_helpers::serialized_event_size(e) as usize) + .sum(); + ctx.set_history_size_bytes(history_size_bytes); + let ctx_for_future = ctx.clone(); let h = handler.clone(); let inp = input.clone(); diff --git a/src/runtime/state_helpers.rs b/src/runtime/state_helpers.rs index 0d2e210..027cc08 100644 --- a/src/runtime/state_helpers.rs +++ b/src/runtime/state_helpers.rs @@ -4,6 +4,16 @@ use crate::{ }; use tracing::warn; +/// Compute the serialized JSON size of an event in bytes. +/// +/// Matches the encoding used by the SQLite provider when persisting events, +/// so the running totals on `HistoryManager` are within a few bytes of the +/// on-disk row size. Falls back to `0` only on serialization failure, which +/// is treated as a programming error elsewhere in the codebase. +pub(crate) fn serialized_event_size(event: &Event) -> u64 { + serde_json::to_vec(event).map(|v| v.len() as u64).unwrap_or(0) +} + /// Reader for extracting metadata from orchestration history /// /// This struct provides convenient access to key information derived from @@ -42,6 +52,21 @@ pub struct HistoryManager { /// New events to be appended (history delta) delta: Vec, + + /// Running total of serialized bytes in `history` (baseline only). + /// + /// Computed once in `from_history()` and never updated thereafter — the + /// baseline is immutable for the lifetime of a `HistoryManager`. Read by + /// `OrchestrationContext::history_size_bytes()` to give orchestration + /// code a deterministic view of how close it is to `MAX_HISTORY_BYTES`. + history_bytes: u64, + + /// Running total of serialized bytes in `delta` (events appended this turn). + /// + /// Maintained incrementally on `append`/`extend` so the pre-persist + /// history-cap check (`history_bytes + delta_bytes <= MAX_HISTORY_BYTES`) + /// is `O(1)` per event added. + delta_bytes: u64, } impl HistoryManager { @@ -50,6 +75,10 @@ impl HistoryManager { /// Scans through the history (in reverse for terminal states) to extract /// commonly needed information. pub fn from_history(history: &[Event]) -> Self { + // Pre-compute the total serialized size of the baseline history. + // Uses the same JSON encoding the SQLite provider stores, so the + // counter matches on-disk row sizes within a few bytes of overhead. + let history_bytes: u64 = history.iter().map(serialized_event_size).sum(); let mut metadata = Self { orchestration_name: None, orchestration_version: None, @@ -62,6 +91,8 @@ impl HistoryManager { current_execution_id: None, history: history.to_vec(), delta: Vec::new(), + history_bytes, + delta_bytes: 0, }; // Scan forward for OrchestrationStarted (could be multiple due to CAN) @@ -160,6 +191,7 @@ impl HistoryManager { /// Append a single event to the delta pub fn append(&mut self, event: Event) { + self.delta_bytes = self.delta_bytes.saturating_add(serialized_event_size(&event)); self.delta.push(event); } @@ -179,14 +211,53 @@ impl HistoryManager { /// Extend delta with multiple events pub fn extend(&mut self, events: Vec) { + for event in &events { + self.delta_bytes = self.delta_bytes.saturating_add(serialized_event_size(event)); + } self.delta.extend(events); } + /// Total serialized bytes of the baseline history (excludes this turn's delta). + /// + /// This is the value surfaced to orchestration code via + /// `OrchestrationContext::history_size_bytes()`. It is deterministic across + /// replay because the baseline history is fixed at construction time. + pub fn baseline_history_bytes(&self) -> u64 { + self.history_bytes + } + + /// Total serialized bytes of the delta accumulated this turn. + /// + /// Maintained incrementally as events are appended. + pub fn delta_bytes(&self) -> u64 { + self.delta_bytes + } + + /// Proposed total bytes if the current delta were to be persisted + /// (`baseline_history_bytes() + delta_bytes()`). + /// + /// Used by the pre-persist `MAX_HISTORY_BYTES` check. + pub fn proposed_total_bytes(&self) -> u64 { + self.history_bytes.saturating_add(self.delta_bytes) + } + /// Get a reference to the history delta pub fn delta(&self) -> &[Event] { &self.delta } + /// Discard all events accumulated in the delta this turn and reset the + /// delta byte counter. + /// + /// Used by the dispatcher when a tier-2 limit violation is detected: per + /// `docs/proposals/size-limits.md` § 5.1, oversized deltas are dropped + /// in memory before a single terminal `OrchestrationFailed` event is + /// appended, so no part of the rejected work reaches the provider. + pub fn clear_delta(&mut self) { + self.delta.clear(); + self.delta_bytes = 0; + } + /// Consume the manager and return the history delta pub fn into_delta(self) -> Vec { self.delta diff --git a/tests/scenarios.rs b/tests/scenarios.rs index 86437ce..3e5b356 100644 --- a/tests/scenarios.rs +++ b/tests/scenarios.rs @@ -39,3 +39,6 @@ mod replay_versioning; #[path = "scenarios/copilot_chat.rs"] mod copilot_chat; + +#[path = "scenarios/limits.rs"] +mod limits; diff --git a/tests/scenarios/limits.rs b/tests/scenarios/limits.rs new file mode 100644 index 0000000..e04e29a --- /dev/null +++ b/tests/scenarios/limits.rs @@ -0,0 +1,871 @@ +//! Size & shape limits regression tests — spec §8 +//! +//! Covers all four toggle combinations +//! (`enforce_size_limits` × `emit_limit_exceeded_errors`) and every test row +//! listed in `docs/proposals/size-limits.md` §8. +//! +//! References: +//! - `docs/proposals/size-limits.md` +//! - `docs/proposals-impl/PROGRESS-size-limits.md` + +#![allow(clippy::unwrap_used)] +#![allow(clippy::expect_used)] + +use duroxide::runtime::limits::{MAX_HISTORY_BYTES, MAX_PAYLOAD_BYTES, MAX_SMALL_VALUE_BYTES}; +use duroxide::runtime::{self, RuntimeOptions}; +use duroxide::{ActivityContext, Client, ConfigErrorKind, ErrorDetails, OrchestrationContext, OrchestrationRegistry, OrchestrationStatus}; +use duroxide::runtime::registry::ActivityRegistry; +use std::sync::Arc; +use std::time::Duration; + +#[path = "../common/mod.rs"] +mod common; + +// --------------------------------------------------------------------------- +// Helper: build RuntimeOptions with specific toggle state +// --------------------------------------------------------------------------- + +fn opts(enforce: bool, emit_le: bool) -> RuntimeOptions { + RuntimeOptions { + enforce_size_limits: enforce, + emit_limit_exceeded_errors: emit_le, + orchestration_concurrency: 1, + worker_concurrency: 1, + ..Default::default() + } +} + +/// Wait for orchestration and panic on timeout. +async fn wait(client: &Client, instance: &str) -> OrchestrationStatus { + client + .wait_for_orchestration(instance, Duration::from_secs(15)) + .await + .unwrap_or_else(|e| panic!("wait_for_orchestration error: {e}")) +} + +// --------------------------------------------------------------------------- +// Test: enforce=off → large payloads succeed (baseline: no regressions) +// --------------------------------------------------------------------------- + +/// `enforce_off_measures_but_does_not_fail` +/// +/// With enforcement off, an activity returning a 4 MiB payload should succeed. +#[tokio::test] +async fn enforce_off_does_not_fail_large_activity_output() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let big_output = "x".repeat(MAX_PAYLOAD_BYTES + 1); + let big_output_clone = big_output.clone(); + + let activity_registry = ActivityRegistry::builder() + .register("BigOutput", move |_ctx: ActivityContext, _input: String| { + let out = big_output_clone.clone(); + async move { Ok(out) } + }) + .build(); + + let orch_reg = OrchestrationRegistry::builder() + .register("BigOutputOrch", |ctx: OrchestrationContext, _: String| async move { + let result = ctx.schedule_activity("BigOutput", String::new()).await?; + Ok(result) + }) + .build(); + + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(false, false), + ).await; + + let client = Client::new(store.clone()); + client.start_orchestration("big-output-off", "BigOutputOrch", "").await.unwrap(); + + let status = wait(&client, "big-output-off").await; + assert!( + matches!(status, OrchestrationStatus::Completed { .. }), + "expected Completed with enforce=off, got {status:?}" + ); + + rt.shutdown(None).await; +} + +// --------------------------------------------------------------------------- +// Tier-1 client checks +// --------------------------------------------------------------------------- + +/// `orchestration_input_too_large` +/// +/// With enforcement on, starting an orchestration with 4 MiB input returns +/// `Err(ClientError::Configuration)`. Nothing is written to the provider. +#[tokio::test] +async fn tier1_orchestration_input_too_large() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let client = Client::new(store.clone()).with_size_limits_enforced(); + let big_input = "x".repeat(MAX_PAYLOAD_BYTES + 1); + + let err = client + .start_orchestration("inst", "SomeOrch", big_input) + .await + .expect_err("should reject oversized input"); + + assert!( + matches!(err, duroxide::ClientError::Configuration { ref resource, .. } if resource == "orchestration_input"), + "unexpected error: {err:?}" + ); +} + +/// `external_event_too_large` +/// +/// `raise_event` with a 100 KiB + 1 payload is rejected at the client. +#[tokio::test] +async fn tier1_external_event_too_large() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let client = Client::new(store.clone()).with_size_limits_enforced(); + let big_payload = "x".repeat(MAX_SMALL_VALUE_BYTES + 1); + + let err = client + .raise_event("inst", "MyEvent", big_payload) + .await + .expect_err("should reject oversized event payload"); + + assert!( + matches!(err, duroxide::ClientError::Configuration { ref resource, .. } if resource.starts_with("external_event:")), + "unexpected error: {err:?}" + ); +} + +/// `instance_id_too_long` +/// +/// Starting an orchestration with an instance ID > `MAX_SMALL_VALUE_BYTES` bytes +/// is rejected at the client. +#[tokio::test] +async fn tier1_instance_id_too_long() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let client = Client::new(store.clone()).with_size_limits_enforced(); + let long_id = "x".repeat(MAX_SMALL_VALUE_BYTES + 1); + + let err = client + .start_orchestration(long_id, "SomeOrch", "") + .await + .expect_err("should reject oversized instance_id"); + + assert!( + matches!(err, duroxide::ClientError::Configuration { ref resource, .. } if resource == "identifier:instance_id"), + "unexpected error: {err:?}" + ); +} + +// --------------------------------------------------------------------------- +// Tier-2: per-event payload checks +// --------------------------------------------------------------------------- + +/// `activity_input_too_large` +/// +/// Scheduling an activity with a 4 MiB input causes a tier-2 failure. +/// With `emit_limit_exceeded_errors=true` the error is `Configuration::LimitExceeded`. +/// The oversized delta is never persisted; instance terminates. +#[tokio::test] +async fn tier2_activity_input_too_large_emit_on() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let big_input = "x".repeat(MAX_PAYLOAD_BYTES + 1); + + let activity_registry = ActivityRegistry::builder() + .register("Noop", |_ctx: ActivityContext, _: String| async move { Ok("ok".to_string()) }) + .build(); + + let orch_reg = OrchestrationRegistry::builder() + .register("BigInputOrch", move |ctx: OrchestrationContext, _: String| { + let inp = big_input.clone(); + async move { + let _ = ctx.schedule_activity("Noop", inp).await?; + Ok("done".to_string()) + } + }) + .build(); + + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(true, true), + ).await; + + let client = Client::new(store.clone()); + client.start_orchestration("big-act-input", "BigInputOrch", "").await.unwrap(); + + let status = wait(&client, "big-act-input").await; + + match status { + OrchestrationStatus::Failed { details, .. } => { + assert!( + matches!( + &details, + ErrorDetails::Configuration { + kind: ConfigErrorKind::LimitExceeded, + resource, + .. + } if resource.starts_with("activity_input:") + ), + "expected LimitExceeded(activity_input:*), got {details:?}" + ); + } + other => panic!("expected Failed, got {other:?}"), + } + + rt.shutdown(None).await; +} + +/// Same scenario with `emit_limit_exceeded_errors=false`: +/// failure uses Application shape (mixed-cluster-safe). +#[tokio::test] +async fn tier2_activity_input_too_large_emit_off() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let big_input = "x".repeat(MAX_PAYLOAD_BYTES + 1); + + let activity_registry = ActivityRegistry::builder() + .register("Noop", |_ctx: ActivityContext, _: String| async move { Ok("ok".to_string()) }) + .build(); + + let orch_reg = OrchestrationRegistry::builder() + .register("BigInputOrch2", move |ctx: OrchestrationContext, _: String| { + let inp = big_input.clone(); + async move { + let _ = ctx.schedule_activity("Noop", inp).await?; + Ok("done".to_string()) + } + }) + .build(); + + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(true, false), + ).await; + + let client = Client::new(store.clone()); + client.start_orchestration("big-act-input-app", "BigInputOrch2", "").await.unwrap(); + + let status = wait(&client, "big-act-input-app").await; + + match status { + OrchestrationStatus::Failed { details, .. } => { + assert!( + matches!(&details, ErrorDetails::Application { .. }), + "expected Application shape with emit_le=false, got {details:?}" + ); + } + other => panic!("expected Failed, got {other:?}"), + } + + rt.shutdown(None).await; +} + +// --------------------------------------------------------------------------- +// Tier-2: side-effect drops +// --------------------------------------------------------------------------- + +/// `tier2_failure_drops_side_effects` +/// +/// An orchestration that tries to schedule an activity WITH an oversized input +/// in the same turn. The activity should NOT be enqueued — the rejected delta +/// drops all side effects. +#[tokio::test] +async fn tier2_failure_drops_side_effects() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let big_input = "x".repeat(MAX_PAYLOAD_BYTES + 1); + + // A counter to detect whether the activity actually ran. + let ran = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let ran_clone = ran.clone(); + + let activity_registry = ActivityRegistry::builder() + .register("SideEffect", move |_ctx: ActivityContext, _: String| { + ran_clone.store(true, std::sync::atomic::Ordering::SeqCst); + async move { Ok("ran".to_string()) } + }) + .build(); + + let orch_reg = OrchestrationRegistry::builder() + .register("SideEffectOrch", move |ctx: OrchestrationContext, _: String| { + let inp = big_input.clone(); + async move { + // This schedules an activity with an oversized input. + // The tier-2 check should reject the whole delta, + // so the activity enqueue is also dropped. + let _ = ctx.schedule_activity("SideEffect", inp).await?; + Ok("done".to_string()) + } + }) + .build(); + + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(true, true), + ).await; + + let client = Client::new(store.clone()); + client.start_orchestration("side-effect-drop", "SideEffectOrch", "").await.unwrap(); + + let status = wait(&client, "side-effect-drop").await; + + assert!( + matches!(status, OrchestrationStatus::Failed { .. }), + "expected Failed, got {status:?}" + ); + + // Give the worker dispatcher a brief window to process any stray activity enqueue. + tokio::time::sleep(Duration::from_millis(200)).await; + + assert!( + !ran.load(std::sync::atomic::Ordering::SeqCst), + "side-effect activity should not have run (delta was dropped)" + ); + + rt.shutdown(None).await; +} + +// --------------------------------------------------------------------------- +// Tier-3: activity output too large +// --------------------------------------------------------------------------- + +/// `activity_output_too_large` +/// +/// Activity returns > MAX_PAYLOAD_BYTES. With enforcement on and +/// `emit_limit_exceeded_errors=true`, the instance terminates with +/// `Configuration::LimitExceeded { resource: "activity_output:*" }`. +/// Raw output is never logged. +#[tokio::test] +async fn tier3_activity_output_too_large_emit_on() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let big_out = "y".repeat(MAX_PAYLOAD_BYTES + 1); + + let activity_registry = ActivityRegistry::builder() + .register("HugeOutput", move |_ctx: ActivityContext, _: String| { + let out = big_out.clone(); + async move { Ok(out) } + }) + .build(); + + let orch_reg = OrchestrationRegistry::builder() + .register("HugeOutputOrch", |ctx: OrchestrationContext, _: String| async move { + let result = ctx.schedule_activity("HugeOutput", String::new()).await?; + Ok(result) + }) + .build(); + + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(true, true), + ).await; + + let client = Client::new(store.clone()); + client.start_orchestration("huge-out", "HugeOutputOrch", "").await.unwrap(); + + let status = wait(&client, "huge-out").await; + + match status { + OrchestrationStatus::Failed { details, .. } => { + assert!( + matches!( + &details, + ErrorDetails::Configuration { + kind: ConfigErrorKind::LimitExceeded, + resource, + .. + } if resource.starts_with("activity_output:") + ), + "expected LimitExceeded(activity_output:*), got {details:?}" + ); + } + other => panic!("expected Failed, got {other:?}"), + } + + rt.shutdown(None).await; +} + +// --------------------------------------------------------------------------- +// Aggregate history cap +// --------------------------------------------------------------------------- + +/// `history_cap_terminates_instance` +/// +/// An orchestration that loops, scheduling activities with ~1 MiB outputs +/// until the aggregate exceeds `MAX_HISTORY_BYTES` (5 MiB). The instance +/// must terminate with a limit failure before exceeding the cap. +/// +/// We use a modest payload size (~500 KiB × 12 iterations = ~6 MiB) to +/// reliably cross the cap without making the test huge. +#[tokio::test] +async fn history_cap_terminates_instance() { + const CHUNK: usize = 512 * 1024; // 512 KiB per iteration — 11 iterations ≈ 5.5 MiB + + let (store, _td) = common::create_sqlite_store_disk().await; + + let chunk_out = "z".repeat(CHUNK); + + let activity_registry = ActivityRegistry::builder() + .register("Chunk", move |_ctx: ActivityContext, _: String| { + let out = chunk_out.clone(); + async move { Ok(out) } + }) + .build(); + + let orch_reg = OrchestrationRegistry::builder() + .register("HistoryCapOrch", |ctx: OrchestrationContext, _: String| async move { + loop { + let _ = ctx.schedule_activity("Chunk", String::new()).await?; + } + }) + .build(); + + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(true, true), + ).await; + + let client = Client::new(store.clone()); + client.start_orchestration("hist-cap", "HistoryCapOrch", "").await.unwrap(); + + // Wait up to 30 s — the loop terminates once the cap is hit. + let status = client + .wait_for_orchestration("hist-cap", Duration::from_secs(30)) + .await + .unwrap(); + + match status { + OrchestrationStatus::Failed { details, .. } => { + assert!( + matches!( + &details, + ErrorDetails::Configuration { + kind: ConfigErrorKind::LimitExceeded, + resource, + .. + } if resource == "history" + ) || matches!(&details, ErrorDetails::Application { message, .. } if message.contains("Total history size")), + "expected history-cap failure, got {details:?}" + ); + } + other => panic!("expected Failed, got {other:?}"), + } + + rt.shutdown(None).await; +} + +// --------------------------------------------------------------------------- +// history_size_bytes() determinism +// --------------------------------------------------------------------------- + +/// `history_size_bytes_is_deterministic` +/// +/// Validates that: +/// - `history_size_bytes()` is > 0 once history has been accumulated +/// - `history_size_bytes()` is stable within a single turn (reads return the +/// same value because the baseline is fixed at turn-start) +/// - `history_pressure()` is in `[0.0, 1.0]` +/// - The value grows across `continue_as_new` generations (different baselines) +#[tokio::test] +async fn history_size_bytes_is_deterministic() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let activity_registry = ActivityRegistry::builder() + .register("Echo", |_ctx: ActivityContext, input: String| async move { Ok(input) }) + .build(); + + // Generation counter: on gen=0 we build some history then CAN; on gen=1 we record + // the size and finish. The size on gen=1 should be > 0. + let orch_reg = OrchestrationRegistry::builder() + .register("SizeBytesOrch", |ctx: OrchestrationContext, input: String| async move { + let generation: u32 = input.parse().unwrap_or(0); + if generation == 0 { + // Do some work to build baseline history. + let _r1 = ctx.schedule_activity("Echo", "step1".to_string()).await?; + let _r2 = ctx.schedule_activity("Echo", "step2".to_string()).await?; + // Within this turn both reads are identical (stable per-turn baseline). + let size_a = ctx.history_size_bytes(); + let size_b = ctx.history_size_bytes(); + assert_eq!(size_a, size_b, "history_size_bytes() must be stable within a turn"); + assert!(size_a > 0, "size should be > 0 after activities"); + // Roll over — the new execution has a smaller baseline (just OrchestrationStarted). + return ctx.continue_as_new("1".to_string()).await; + } + // Gen 1: history is just the new OrchestrationStarted. + let size1 = ctx.history_size_bytes(); + // Still > 0 (OrchestrationStarted is in the baseline). + assert!(size1 > 0, "size1 should be > 0 on gen=1"); + let pressure = ctx.history_pressure(); + assert!((0.0..=1.0).contains(&pressure), "pressure {pressure} out of range"); + Ok(format!("size={size1},pressure={pressure:.6}")) + }) + .build(); + + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(false, false), + ).await; + + let client = Client::new(store.clone()); + client.start_orchestration("size-det", "SizeBytesOrch", "").await.unwrap(); + + let status = wait(&client, "size-det").await; + assert!( + matches!(status, OrchestrationStatus::Completed { .. }), + "expected Completed, got {status:?}" + ); + + rt.shutdown(None).await; +} + +// --------------------------------------------------------------------------- +// history_pressure() drives continue_as_new +// --------------------------------------------------------------------------- + +/// `history_pressure_drives_continue_as_new` +/// +/// An orchestration that calls `continue_as_new()` when pressure > a very low +/// artificial threshold (achieved by inserting enough activity calls). +/// Verifies the orchestration eventually succeeds after multiple generations, +/// each staying below the cap. +/// +/// We use a 1% threshold here to keep the test fast — the important property +/// is that `history_pressure()` is readable and > 0. +#[tokio::test] +async fn history_pressure_drives_continue_as_new() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let activity_registry = ActivityRegistry::builder() + .register("Bump", |_ctx: ActivityContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + Ok((n + 1).to_string()) + }) + .build(); + + let orch_reg = OrchestrationRegistry::builder() + .register("PressureOrch", |ctx: OrchestrationContext, input: String| async move { + let generation: u32 = input.parse().unwrap_or(0); + if generation >= 3 { + return Ok("done after 3 generations".to_string()); + } + // Do some work to build up a little history. + let n = ctx.schedule_activity("Bump", generation.to_string()).await?; + let _ = ctx.schedule_activity("Bump", n.clone()).await?; + + // Roll over if pressure > 0 (which it will be after the first activity). + if ctx.history_pressure() > 0.0 { + let next_gen = generation + 1; + return ctx.continue_as_new(next_gen.to_string()).await; + } + Ok("done".to_string()) + }) + .build(); + + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(false, false), + ).await; + + let client = Client::new(store.clone()); + client.start_orchestration("pressure-can", "PressureOrch", "0").await.unwrap(); + + let status = wait(&client, "pressure-can").await; + assert!( + matches!(status, OrchestrationStatus::Completed { ref output, .. } if output == "done after 3 generations"), + "expected Completed with 3 generations, got {status:?}" + ); + + rt.shutdown(None).await; +} + +// --------------------------------------------------------------------------- +// Pre-existing limits keep Application shape when emit_le=false +// --------------------------------------------------------------------------- + +/// `existing_limit_failures_keep_application_when_emit_off` +/// +/// Triggering MAX_CUSTOM_STATUS_BYTES with `emit_limit_exceeded_errors=false` +/// should produce the legacy `Application { OrchestrationFailed }` shape. +#[tokio::test] +async fn existing_limit_failures_keep_application_when_emit_off() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let activity_registry = ActivityRegistry::builder() + .register("Noop", |_ctx: ActivityContext, _: String| async move { Ok("ok".to_string()) }) + .build(); + + // MAX_CUSTOM_STATUS_BYTES is 256 KiB; a 300 KiB string exceeds it. + let big_status = "s".repeat(300 * 1024); + + let orch_reg = OrchestrationRegistry::builder() + .register("CustomStatusOrch", move |ctx: OrchestrationContext, _: String| { + let s = big_status.clone(); + async move { + ctx.set_custom_status(s); + let _ = ctx.schedule_activity("Noop", String::new()).await?; + Ok("done".to_string()) + } + }) + .build(); + + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(false, false), + ).await; + + let client = Client::new(store.clone()); + client.start_orchestration("big-status-emit-off", "CustomStatusOrch", "").await.unwrap(); + + let status = wait(&client, "big-status-emit-off").await; + + match status { + OrchestrationStatus::Failed { details, .. } => { + assert!( + matches!(&details, ErrorDetails::Application { .. }), + "expected Application shape, got {details:?}" + ); + } + other => panic!("expected Failed, got {other:?}"), + } + + rt.shutdown(None).await; +} + +/// `existing_limit_failures_become_configuration_when_emit_on` +/// +/// Same scenario with `emit_limit_exceeded_errors=true` → `Configuration::LimitExceeded`. +#[tokio::test] +async fn existing_limit_failures_become_configuration_when_emit_on() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let activity_registry = ActivityRegistry::builder() + .register("Noop2", |_ctx: ActivityContext, _: String| async move { Ok("ok".to_string()) }) + .build(); + + let big_status = "s".repeat(300 * 1024); + + let orch_reg = OrchestrationRegistry::builder() + .register("CustomStatusOrch2", move |ctx: OrchestrationContext, _: String| { + let s = big_status.clone(); + async move { + ctx.set_custom_status(s); + let _ = ctx.schedule_activity("Noop2", String::new()).await?; + Ok("done".to_string()) + } + }) + .build(); + + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(false, true), + ).await; + + let client = Client::new(store.clone()); + client.start_orchestration("big-status-emit-on", "CustomStatusOrch2", "").await.unwrap(); + + let status = wait(&client, "big-status-emit-on").await; + + match status { + OrchestrationStatus::Failed { details, .. } => { + assert!( + matches!( + &details, + ErrorDetails::Configuration { kind: ConfigErrorKind::LimitExceeded, .. } + ), + "expected Configuration::LimitExceeded, got {details:?}" + ); + } + other => panic!("expected Failed, got {other:?}"), + } + + rt.shutdown(None).await; +} + +// --------------------------------------------------------------------------- +// Multibyte UTF-8 name length is measured in bytes, not chars +// --------------------------------------------------------------------------- + +/// `multibyte_utf8_name_length` +/// +/// An activity with a 64-emoji (256 UTF-8 bytes) name: accepted. +/// An activity with MAX_SMALL_VALUE_BYTES/4 + 1 emojis (> 64 KiB UTF-8): rejected. +/// +/// Each emoji (🔥) is 4 UTF-8 bytes. MAX_SMALL_VALUE_BYTES = 64 KiB = 65536 bytes. +/// So 65536 / 4 = 16384 emojis = exactly 65536 bytes → OK. +/// 16385 emojis = 65540 bytes → rejected. +#[tokio::test] +async fn multibyte_utf8_name_length_bytes_not_chars() { + let (store, _td) = common::create_sqlite_store_disk().await; + + // Short name (within limit) — 64 emojis = 256 bytes, well under 64 KiB. + let short_name: String = "🔥".repeat(64); + // Long name (over limit by 4 bytes) — 16385 emojis = 65540 bytes > 65536. + let long_name: String = "🔥".repeat(MAX_SMALL_VALUE_BYTES / 4 + 1); + + assert!(short_name.len() < MAX_SMALL_VALUE_BYTES, "short_name should be under limit"); + assert!(long_name.len() > MAX_SMALL_VALUE_BYTES, "long_name should be over limit"); + + let short_name_clone = short_name.clone(); + let long_name_clone = long_name.clone(); + + let activity_registry = ActivityRegistry::builder() + .register(short_name.clone(), |_ctx: ActivityContext, _: String| async move { + Ok("short-ok".to_string()) + }) + .register(long_name.clone(), |_ctx: ActivityContext, _: String| async move { + Ok("long-ok".to_string()) + }) + .build(); + + // Orchestration 1: schedules the short-name activity → should succeed. + let orch_reg = OrchestrationRegistry::builder() + .register("ShortNameOrch", move |ctx: OrchestrationContext, _: String| { + let name = short_name_clone.clone(); + async move { + let r = ctx.schedule_activity(name, String::new()).await?; + Ok(r) + } + }) + .register("LongNameOrch", move |ctx: OrchestrationContext, _: String| { + let name = long_name_clone.clone(); + async move { + let r = ctx.schedule_activity(name, String::new()).await?; + Ok(r) + } + }) + .build(); + + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(true, true), + ).await; + + let client = Client::new(store.clone()); + + // Short name → should complete. + client.start_orchestration("short-name-inst", "ShortNameOrch", "").await.unwrap(); + let status_short = wait(&client, "short-name-inst").await; + assert!( + matches!(status_short, OrchestrationStatus::Completed { .. }), + "short-name activity should complete, got {status_short:?}" + ); + + // Long name → should fail with LimitExceeded (name:activity). + client.start_orchestration("long-name-inst", "LongNameOrch", "").await.unwrap(); + let status_long = wait(&client, "long-name-inst").await; + match status_long { + OrchestrationStatus::Failed { details, .. } => { + assert!( + matches!( + &details, + ErrorDetails::Configuration { + kind: ConfigErrorKind::LimitExceeded, + resource, + .. + } if resource == "name:activity" + ), + "expected LimitExceeded(name:activity), got {details:?}" + ); + } + other => panic!("expected Failed for long-name activity, got {other:?}"), + } + + rt.shutdown(None).await; +} + +// --------------------------------------------------------------------------- +// enforce=true, emit=false: Application shape (mixed-cluster-safe) +// --------------------------------------------------------------------------- + +/// `enforce_on_emit_off_uses_application_shape` +/// +/// Tier-2 violation with `enforce=true, emit_le=false` should produce +/// `Application` shape, not `Configuration::LimitExceeded`. +#[tokio::test] +async fn enforce_on_emit_off_uses_application_shape() { + let (store, _td) = common::create_sqlite_store_disk().await; + + let big_input = "x".repeat(MAX_PAYLOAD_BYTES + 1); + + let activity_registry = ActivityRegistry::builder() + .register("Noop3", |_ctx: ActivityContext, _: String| async move { Ok("ok".to_string()) }) + .build(); + + let orch_reg = OrchestrationRegistry::builder() + .register("AppShapeOrch", move |ctx: OrchestrationContext, _: String| { + let inp = big_input.clone(); + async move { + let _ = ctx.schedule_activity("Noop3", inp).await?; + Ok("done".to_string()) + } + }) + .build(); + + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(true, false), + ).await; + + let client = Client::new(store.clone()); + client.start_orchestration("app-shape-inst", "AppShapeOrch", "").await.unwrap(); + + let status = wait(&client, "app-shape-inst").await; + + match status { + OrchestrationStatus::Failed { details, .. } => { + assert!( + matches!(&details, ErrorDetails::Application { .. }), + "expected Application shape with emit_le=false, got {details:?}" + ); + } + other => panic!("expected Failed, got {other:?}"), + } + + rt.shutdown(None).await; +} + +// --------------------------------------------------------------------------- +// Replay of pre-limit history succeeds +// --------------------------------------------------------------------------- + +/// `replay_of_pre_limit_history_succeeds` +/// +/// History replayed on newer runtime should succeed if the payload was within +/// limits when written. We test this by running with `enforce=off` (simulating +/// pre-limit runtime), then verifying that the completed history is still +/// accessible and its `history_size_bytes` is reported sensibly. +#[tokio::test] +async fn replay_of_pre_limit_history_succeeds() { + let (store, _td) = common::create_sqlite_store_disk().await; + + // Use a moderate payload that would be fine under both old and new runtimes. + let modest_output = "m".repeat(1024); // 1 KiB — well within limits. + + let activity_registry = ActivityRegistry::builder() + .register("ModestOut", move |_ctx: ActivityContext, _: String| { + let out = modest_output.clone(); + async move { Ok(out) } + }) + .build(); + + let orch_reg = OrchestrationRegistry::builder() + .register("ModestOrch", |ctx: OrchestrationContext, _: String| async move { + let r = ctx.schedule_activity("ModestOut", String::new()).await?; + // Record the history size in the output for assertion. + let sz = ctx.history_size_bytes(); + Ok(format!("{r}|size={sz}")) + }) + .build(); + + // Run with enforcement ON — should still complete fine. + let rt = runtime::Runtime::start_with_options( + store.clone(), activity_registry, orch_reg, opts(true, true), + ).await; + + let client = Client::new(store.clone()); + client.start_orchestration("pre-limit-inst", "ModestOrch", "").await.unwrap(); + + let status = wait(&client, "pre-limit-inst").await; + + match status { + OrchestrationStatus::Completed { output, .. } => { + assert!(output.contains("size="), "output should contain size= but got: {output}"); + let size_str = output.split("size=").nth(1).unwrap_or("0"); + let size: usize = size_str.parse().unwrap_or(0); + assert!(size > 0, "history_size_bytes should be > 0 but got {size}"); + assert!(size < MAX_HISTORY_BYTES, "history_size_bytes should be < MAX_HISTORY_BYTES"); + } + other => panic!("expected Completed, got {other:?}"), + } + + rt.shutdown(None).await; +}