Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,21 @@ Latency floor: timer firing latency is bounded by `max_fetch_interval` (default

### User-owned schema

`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job — but it **does** declare three indexes on the table itself so Alembic autogenerate brings them up: the partial `(queue, next_attempt_at) WHERE acquired_token IS NULL` backs the fetch CTE's Branch A (unleased rows); the partial `(queue, acquired_at) WHERE acquired_token IS NOT NULL` backs Branch B (expired-lease reclaim); and the partial unique `(queue, timer_id) WHERE timer_id IS NOT NULL` enforces `timer_id` dedup. The fetch CTE's OR is written so each disjunct explicitly carries the matching partial-index predicate as a conjunct — Postgres only uses a partial index when the query implies its WHERE clause, so the naive `acquired_at < cutoff` form would not engage `_lease_idx` and would fall back to seq-scan. Both fetch-side indexes pay write amplification on every claim (the fetch UPDATE writes `acquired_token` and `acquired_at`), traded for bounded scan cost under sustained lease expiry. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` (no archive, no DLQ).
`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job — but it **does** declare three indexes on the table itself so Alembic autogenerate brings them up: the partial `(queue, next_attempt_at) WHERE acquired_token IS NULL` backs the fetch CTE's Branch A (unleased rows); the partial `(queue, acquired_at) WHERE acquired_token IS NOT NULL` backs Branch B (expired-lease reclaim); and the partial unique `(queue, timer_id) WHERE timer_id IS NOT NULL` enforces `timer_id` dedup. The fetch CTE's OR is written so each disjunct explicitly carries the matching partial-index predicate as a conjunct — Postgres only uses a partial index when the query implies its WHERE clause, so the naive `acquired_at < cutoff` form would not engage `_lease_idx` and would fall back to seq-scan. Both fetch-side indexes pay write amplification on every claim (the fetch UPDATE writes `acquired_token` and `acquired_at`), traded for bounded scan cost under sustained lease expiry. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` by default; opt in to audit via `OutboxBroker(..., dlq_table=make_dlq_table(metadata))` — see "Opt-in DLQ" below.

### Opt-in DLQ on terminal failure

`make_dlq_table(metadata, table_name="outbox_dlq")` returns a sibling audit table; pass it to `OutboxBroker(..., dlq_table=...)` to archive terminal failures. Default broker behavior is unchanged when `dlq_table` is None — every existing code path is bit-for-bit identical.

Atomicity is via a single Postgres CTE: `OutboxClient.delete_with_lease` switches to `WITH deleted AS (DELETE … RETURNING …) INSERT INTO <dlq> SELECT … FROM deleted` when configured. One statement preserves the writer-connection autocommit fast path and the lease-token guard; INSERT failure rolls back the DELETE, so the outbox row stays leased and is reclaimed when the lease expires — DLQ misconfiguration surfaces as outbox-table growth + `lease_lost` spikes rather than silent audit loss. Identifiers are quoted via the dialect's `identifier_preparer`; values flow through bind params.

`OutboxInnerMessage.terminal_failure_reason` is set on the three failure paths (`allow_delivery` False → `"max_deliveries"`, `_nack` strategy-exhausted → `"retry_terminal"`, `_reject` → `"rejected"`). `_flush_terminal` reads it to decide whether to build a DLQ payload; `dispatch_one` also reads it to pick the `nacked_terminal(reason=…)` tag value — branch on `terminal_failure_reason` BEFORE `last_exception`, so manual `await msg.reject()` (no exception raised) routes correctly to `nacked_terminal(reason="rejected")` instead of the previously-incorrect `acked`. Success (`_ack`) leaves the field None; success rows never touch the DLQ. **The `DLQFailureReason` `Literal` type (in `message.py`) is the public contract** for this string — operator queries and dashboard labels key off these values, so changing them is API-breaking.

`last_exception` is serialized via `repr()` and bounded by `_LAST_EXCEPTION_MAX_CHARS=8192` in `subscriber/usecase.py`. Some exceptions carry MB-scale payloads (validation errors with the full request body, `asyncpg.DataError` with the rejected row); an unbounded `repr` would extend the writer round-trip on a poison row and bloat the DLQ. Truncation appends `…[truncated]`. The DLQ `failure_reason` column is `String(64)` (current literals fit in 14 bytes; the breathing room lets the canonical set grow without a column-widening migration).

There is no built-in retention/pruning. Operators are responsible for archival — suggested pattern: partition the DLQ by `failed_at` and drop old partitions via a cron job.

User-facing reference: `docs/usage/dlq.md`.

`validate_schema()` delegates to `alembic.autogenerate.compare_metadata` against a throwaway `MetaData` populated by `make_outbox_table(...)` — so the canonical `Table` is the single source of truth and the validator never duplicates the schema declaration. It only flags **missing** schema (`add_*` / `modify_*` ops); `remove_*` ops are intentionally ignored so users may attach extras (audit columns, their own indexes). Alembic is an **optional dependency** (`faststream-outbox[validate]`); without it, `validate_schema()` raises `ImportError`, but every other code path works (the import lives at the top of `client.py` inside a try/except, with module-level sentinels `_alembic_compare_metadata` / `_AlembicMigrationContext` set to `None` on failure).

Expand Down Expand Up @@ -131,7 +145,7 @@ The caller owns the `AsyncEngine`. `OutboxBrokerConfig.disconnect()` deliberatel

### Metrics seam (`metrics/__init__.py`)

`OutboxBroker(..., metrics_recorder=...)` accepts a `MetricsRecorder = Callable[[str, Mapping[str, Any]], None]`. The default (`_noop_recorder`) lets instrumentation sites call unconditionally. The recorder threads through `OutboxBrokerConfig.metrics_recorder` to two places: the subscriber's six emission points (`fetched`, `dispatched`, `acked`, `nacked_retried`, `nacked_terminal`, `lease_lost`) via `OutboxSubscriber._emit_metric`, and the producer's single emission point (`published`) via `OutboxProducer._emit_metric`. The producer reads the recorder from its own constructor kwarg (passed in alongside the config field) so the canonical insert path doesn't have to reach through the broker config at call time.
`OutboxBroker(..., metrics_recorder=...)` accepts a `MetricsRecorder = Callable[[str, Mapping[str, Any]], None]`. The default (`_noop_recorder`) lets instrumentation sites call unconditionally. The recorder threads through `OutboxBrokerConfig.metrics_recorder` to two places: the subscriber's emission points (`fetched`, `dispatched`, `acked`, `nacked_retried`, `nacked_terminal`, `lease_lost`, plus `dlq_written` when `dlq_table` is configured) via `OutboxSubscriber._emit_metric`, and the producer's single emission point (`published`) via `OutboxProducer._emit_metric`. The producer reads the recorder from its own constructor kwarg (passed in alongside the config field) so the canonical insert path doesn't have to reach through the broker config at call time. `dlq_written` and `nacked_terminal` are complementary — alert on a divergence between the two rates to catch DLQ misconfiguration without silent audit loss.

Every call site wraps the recorder in `try/except` and logs at DEBUG — a broken recorder never poisons the dispatch loop. The recorder is called from the event loop and **must not block**; sync `Counter.inc()` is fine, blocking HTTP/StatsD calls are not. The library does not wrap user recorders in `asyncio.to_thread` — that would destroy ordering and create per-event task explosion.

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async with session_factory() as session, session.begin():

## How it works

A subscriber owns two async loops: a **fetch** loop claims available rows via a single CTE (`SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`), and `max_workers` **worker** loops dispatch to the handler. On success, `DELETE WHERE id=:id AND acquired_token=:token`; on failure, the retry strategy schedules another attempt or terminally drops the row.
A subscriber owns two async loops: a **fetch** loop claims available rows via a single CTE (`SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`), and `max_workers` **worker** loops dispatch to the handler. On success, `DELETE WHERE id=:id AND acquired_token=:token`; on failure, the retry strategy schedules another attempt or terminally drops the row. Terminal failures `DELETE` by default; pass `dlq_table=make_dlq_table(metadata)` to atomically archive them into a sibling audit table instead — see [Dead-letter queue](https://faststream-outbox.readthedocs.io/en/latest/usage/dlq/).

The `acquired_token` is the load-bearing invariant: a slow handler whose lease expired and was re-claimed by another worker finds its terminal `DELETE` to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder.

Expand All @@ -52,7 +52,7 @@ See [How it works](https://faststream-outbox.readthedocs.io/en/latest/introducti

## Acknowledgements

The architecture of this package is heavily informed by Arseniy Popov's [PR #2704](https://github.com/ag2ai/faststream/pull/2704) (`feat: add sqla broker`) on upstream FastStream — the FastStream broker/registrator/subscriber wiring, the `SELECT … FOR UPDATE SKIP LOCKED` fetch-and-claim CTE, the retry strategy hierarchy, and the in-transaction publish contract all originate from there. This package is a Postgres-only reimplementation that diverges in storage model (lease tokens instead of an explicit state column, no archive table), loop structure (two loops instead of four), wake-up mechanism (`LISTEN/NOTIFY`), and adds timer mechanics. Credit for the original design belongs to Arseniy.
The architecture of this package is heavily informed by Arseniy Popov's [PR #2704](https://github.com/ag2ai/faststream/pull/2704) (`feat: add sqla broker`) on upstream FastStream — the FastStream broker/registrator/subscriber wiring, the `SELECT … FOR UPDATE SKIP LOCKED` fetch-and-claim CTE, the retry strategy hierarchy, and the in-transaction publish contract all originate from there. This package is a Postgres-only reimplementation that diverges in storage model (lease tokens instead of an explicit state column, archive table is opt-in), loop structure (two loops instead of four), wake-up mechanism (`LISTEN/NOTIFY`), and adds timer mechanics. Credit for the original design belongs to Arseniy.

## Part of `modern-python`

Expand Down
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ A producer writes a domain entity and an outbox row in the *same* SQLAlchemy tra
- [How it works](introduction/how-it-works.md)
- [Basic usage](usage/basic.md)
- [Subscriber](usage/subscriber.md)
- [Dead-letter queue](usage/dlq.md)
- [Publisher](usage/publisher.md)
- [Router](usage/router.md)
- [FastAPI integration](usage/fastapi.md)
Expand Down
Loading
Loading