From d207e9fd0257201b510c5bdecf119f77e44420ff Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Sat, 30 May 2026 09:14:39 +0300 Subject: [PATCH 1/3] feat: opt-in DLQ on terminal failure (issue #26) Adds make_dlq_table() and OutboxBroker(..., dlq_table=...) so terminal failures copy payload + headers + last-exception summary into a DLQ table in the same Postgres statement as the outbox DELETE (CTE), with a dlq_written metric. Off by default; bit-for-bit identical when unset. Co-Authored-By: Claude Opus 4.7 (1M context) --- faststream_outbox/__init__.py | 3 +- faststream_outbox/broker.py | 7 +- faststream_outbox/client.py | 131 +++++++++++-- faststream_outbox/configs.py | 5 + faststream_outbox/message.py | 8 + faststream_outbox/schema.py | 34 ++++ faststream_outbox/subscriber/usecase.py | 28 ++- faststream_outbox/testing.py | 27 +++ tests/conftest.py | 15 +- tests/test_fake.py | 174 ++++++++++++++++- tests/test_integration.py | 129 +++++++++++++ tests/test_unit.py | 245 +++++++++++++++++++++++- 12 files changed, 782 insertions(+), 24 deletions(-) diff --git a/faststream_outbox/__init__.py b/faststream_outbox/__init__.py index e8e6ec0..93c16d4 100644 --- a/faststream_outbox/__init__.py +++ b/faststream_outbox/__init__.py @@ -18,7 +18,7 @@ RetryStrategyProto, ) from faststream_outbox.router import OutboxRouter -from faststream_outbox.schema import make_outbox_table +from faststream_outbox.schema import make_dlq_table, make_outbox_table from faststream_outbox.testing import TestOutboxBroker @@ -35,6 +35,7 @@ "OutboxRouter", "RetryStrategyProto", "TestOutboxBroker", + "make_dlq_table", "make_outbox_table", ] diff --git a/faststream_outbox/broker.py b/faststream_outbox/broker.py index 075e37d..05e8f11 100644 --- a/faststream_outbox/broker.py +++ b/faststream_outbox/broker.py @@ -119,11 +119,12 @@ class OutboxBroker( _subscribers: list["OutboxSubscriber"] - def __init__( + def __init__( # noqa: PLR0913 self, engine: "AsyncEngine | None" = None, *, outbox_table: "Table", + dlq_table: "Table | None" = None, decoder: CustomCallable | None = None, parser: CustomCallable | None = None, dependencies: Iterable["Dependant"] = (), @@ -143,7 +144,8 @@ def __init__( tags: Iterable[Tag | TagDict] = (), ) -> None: self._outbox_table = outbox_table - client = OutboxClient(engine, outbox_table) if engine is not None else None + self._dlq_table = dlq_table + client = OutboxClient(engine, outbox_table, dlq_table=dlq_table) if engine is not None else None fd_config = FastDependsConfig(use_fastdepends=apply_types, serializer=serializer) recorder: MetricsRecorder = metrics_recorder or _noop_recorder producer = OutboxProducer( @@ -156,6 +158,7 @@ def __init__( engine=engine, client=client, metrics_recorder=recorder, + dlq_table=dlq_table, broker_middlewares=(_CaptureExceptionMiddleware, *middlewares), broker_parser=parser, broker_decoder=decoder, diff --git a/faststream_outbox/client.py b/faststream_outbox/client.py index af8418a..ec727ad 100644 --- a/faststream_outbox/client.py +++ b/faststream_outbox/client.py @@ -38,12 +38,12 @@ # don't call validate_schema() never trigger the runtime import path. from faststream_outbox._import_checker import is_alembic_installed from faststream_outbox.message import OutboxInnerMessage -from faststream_outbox.schema import make_outbox_table +from faststream_outbox.schema import make_dlq_table, make_outbox_table if TYPE_CHECKING: import typing - from collections.abc import Mapping, Sequence + from collections.abc import Callable, Mapping, Sequence from alembic.autogenerate import compare_metadata as _alembic_compare_metadata from alembic.migration import MigrationContext as _AlembicMigrationContext @@ -95,6 +95,8 @@ async def delete_with_lease( conn: "AsyncConnection | None", message_id: int, acquired_token: uuid.UUID, + *, + dlq_payload: "Mapping[str, typing.Any] | None" = None, ) -> bool: ... @abc.abstractmethod @@ -118,9 +120,16 @@ async def ping(self) -> bool: ... class OutboxClient(AbstractOutboxClient): - def __init__(self, engine: "AsyncEngine", outbox_table: "Table") -> None: + def __init__( + self, + engine: "AsyncEngine", + outbox_table: "Table", + *, + dlq_table: "Table | None" = None, + ) -> None: self._engine = engine self._table = outbox_table + self._dlq_table = dlq_table @property def table(self) -> "Table": @@ -211,6 +220,8 @@ async def delete_with_lease( conn: "AsyncConnection | None", message_id: int, acquired_token: uuid.UUID, + *, + dlq_payload: "Mapping[str, typing.Any] | None" = None, ) -> bool: """ Delete *message_id* iff it still holds *acquired_token*. Returns True if deleted. @@ -223,15 +234,89 @@ async def delete_with_lease( ``rowcount == 0`` and is silently dropped, whether *conn* is in autocommit (production path) or in an outer transaction (tests). See :meth:`fetch` for why *conn* is ``AsyncConnection | None`` instead of ``AsyncConnection``. + + When *dlq_payload* is provided **and** the client was constructed with a + ``dlq_table``, the statement becomes a single CTE that DELETEs the outbox row + and INSERTs the audit copy into the DLQ atomically: + + WITH deleted AS ( + DELETE FROM WHERE id=:id AND acquired_token=:token + RETURNING id, queue, payload, headers, deliveries_count, created_at + ) + INSERT INTO (original_id, queue, payload, headers, deliveries_count, + created_at, failure_reason, last_exception) + SELECT id, queue, payload, headers, deliveries_count, created_at, + :failure_reason, :last_exception + FROM deleted; + + Lease-lost ⇒ ``deleted`` is empty ⇒ INSERT inserts nothing ⇒ ``rowcount == 0``, + same observable as the no-DLQ path. A DLQ-write failure (schema mismatch, + disk full) rolls back the whole statement, so the outbox row stays leased + and is reclaimed when the lease expires — DLQ misconfiguration surfaces as + outbox growth + ``lease_lost`` spikes rather than silently dropping audit + data. ``dlq_payload`` carries ``{"failure_reason": str, "last_exception": + str | None}``; the keys are required. """ if conn is None: msg = "OutboxClient.delete_with_lease requires a live AsyncConnection (got None)" raise TypeError(msg) + if dlq_payload is not None and self._dlq_table is not None: + stmt, params = self._build_dlq_cte_stmt(message_id, acquired_token, dlq_payload) + result = await conn.execute(stmt, params) + return (result.rowcount or 0) > 0 t = self._table - stmt = delete(t).where(t.c.id == message_id, t.c.acquired_token == acquired_token) - result = await conn.execute(stmt) + del_stmt = delete(t).where(t.c.id == message_id, t.c.acquired_token == acquired_token) + result = await conn.execute(del_stmt) return (result.rowcount or 0) > 0 + def _build_dlq_cte_stmt( + self, + message_id: int, + acquired_token: uuid.UUID, + dlq_payload: "Mapping[str, typing.Any]", + ) -> "tuple[typing.Any, dict[str, typing.Any]]": + """ + Compose the single-statement DLQ CTE plus the parameter dict. + + Identifiers are quoted via the dialect's identifier preparer so reserved words + and odd characters survive interpolation. The outbox/DLQ table names are + application-controlled (from the user's ``MetaData``), not request-derived + input, so the quoting is a robustness/correctness safeguard not a security + boundary. + """ + # ``self._dlq_table`` is guaranteed non-None at the only call site (the guard + # in ``delete_with_lease``); local alias narrows the type for the formatting. + dlq_table = self._dlq_table + assert dlq_table is not None # noqa: S101 + preparer = self._engine.dialect.identifier_preparer + outbox_name = preparer.quote(self._table.name) + dlq_name = preparer.quote(dlq_table.name) + # S608: outbox_name / dlq_name come from application-defined SQLAlchemy + # Table objects (not request input) and are quoted via the dialect's + # identifier preparer — values flow through :bindparam placeholders. + cte_sql = ( + f"WITH deleted AS (" # noqa: S608 + f"DELETE FROM {outbox_name} " + f"WHERE id = :message_id AND acquired_token = :acquired_token " + f"RETURNING id, queue, payload, headers, deliveries_count, created_at" + f") " + f"INSERT INTO {dlq_name} (" + f"original_id, queue, payload, headers, deliveries_count, created_at, " + f"failure_reason, last_exception" + f") " + f"SELECT id, queue, payload, headers, deliveries_count, created_at, " + f":failure_reason, :last_exception " + f"FROM deleted" + ) + sql = text(cte_sql) + params = { + "message_id": message_id, + "acquired_token": acquired_token, + "failure_reason": dlq_payload["failure_reason"], + "last_exception": dlq_payload["last_exception"], + } + return sql, params + async def mark_pending_with_lease( self, conn: "AsyncConnection | None", @@ -277,14 +362,17 @@ async def mark_pending_with_lease( async def validate_schema(self) -> None: """ - Validate that the database table matches the package's expected columns. + Validate that the database table(s) match the package's expected columns. - Raises ``RuntimeError`` listing every mismatch. Opt-in: call from your startup - hook or ``/health`` endpoint, not from ``broker.start()`` (so Alembic can run + Raises ``RuntimeError`` listing every mismatch across the outbox table and, + when configured, the DLQ table. Opt-in: call from your startup hook or + ``/health`` endpoint, not from ``broker.start()`` (so Alembic can run migrations against the same DB without blocking startup). """ async with self._engine.connect() as conn: errors = await conn.run_sync(_validate_schema_sync, self._table) + if self._dlq_table is not None: + errors.extend(await conn.run_sync(_validate_dlq_schema_sync, self._dlq_table)) if errors: msg = "Outbox schema mismatch: " + "; ".join(errors) raise RuntimeError(msg) @@ -316,13 +404,28 @@ def _row_to_message(row: dict) -> OutboxInnerMessage: def _validate_schema_sync(connection: "Connection", table: "Table") -> list[str]: + """Run the outbox-table validation pass; see :func:`_run_validate` for the diff machinery.""" + return _run_validate(connection, table, make_outbox_table) + + +def _validate_dlq_schema_sync(connection: "Connection", table: "Table") -> list[str]: + """Run the DLQ-table validation pass; see :func:`_run_validate` for the diff machinery.""" + return _run_validate(connection, table, make_dlq_table) + + +def _run_validate( + connection: "Connection", + table: "Table", + canonical_factory: "Callable[[MetaData, str], Table]", +) -> list[str]: """ Run Alembic's autogenerate diff against the live DB and surface any "missing schema" drift. - The canonical schema is whatever ``make_outbox_table`` produces — the same Table the user - attaches to their own ``MetaData``. Delegating to Alembic avoids re-implementing column / - index comparison logic (which would diverge from the declaration over time) and keeps the - package out of the schema-management business that Alembic already owns. + The canonical schema is whatever ``canonical_factory`` produces — the same Table the user + attaches to their own ``MetaData`` via ``make_outbox_table`` / ``make_dlq_table``. Delegating + to Alembic avoids re-implementing column / index comparison logic (which would diverge from + the declaration over time) and keeps the package out of the schema-management business that + Alembic already owns. ``add_*`` and ``modify_*`` ops fail validation (the DB is missing or has the wrong shape for something the broker needs). ``remove_*`` ops are ignored — the user may have extra columns @@ -341,10 +444,10 @@ def _validate_schema_sync(connection: "Connection", table: "Table") -> list[str] msg = "validate_schema() requires alembic. Install with `pip install faststream-outbox[validate]`." raise ImportError(msg) - # Isolated MetaData containing ONLY the canonical outbox table, so the user's + # Isolated MetaData containing ONLY the canonical table, so the user's # domain tables (in their own MetaData) don't show up in the diff. canonical_metadata = MetaData() - make_outbox_table(canonical_metadata, table_name=table.name) + canonical_factory(canonical_metadata, table.name) def _include_name(name: str | None, type_: str, parent_names: "Mapping[str, str | None]") -> bool: if type_ == "schema": diff --git a/faststream_outbox/configs.py b/faststream_outbox/configs.py index 434e0d4..9802571 100644 --- a/faststream_outbox/configs.py +++ b/faststream_outbox/configs.py @@ -15,6 +15,7 @@ if typing.TYPE_CHECKING: + from sqlalchemy import Table from sqlalchemy.ext.asyncio import AsyncEngine from faststream_outbox.client import AbstractOutboxClient @@ -25,6 +26,10 @@ class OutboxBrokerConfig(BrokerConfig): engine: "AsyncEngine | None" = None client: "AbstractOutboxClient | None" = None metrics_recorder: MetricsRecorder = _noop_recorder + # When non-None, terminal failures (max_deliveries / retry_terminal / rejected) + # copy audit data into this table in the same statement as the outbox DELETE. + # See ``OutboxClient.delete_with_lease`` for the CTE shape. + dlq_table: "Table | None" = None async def connect(self) -> None: # Engine and client are wired up by the broker's constructor; nothing to do here. diff --git a/faststream_outbox/message.py b/faststream_outbox/message.py index f1dd7f3..1ab87dd 100644 --- a/faststream_outbox/message.py +++ b/faststream_outbox/message.py @@ -60,6 +60,11 @@ class OutboxInnerMessage: # Set by ``_nack`` when the strategy schedules a retry; consumed by the # subscriber's ``_flush_retry`` to drive ``mark_pending_with_lease``. pending_delay_seconds: float | None = field(default=None, init=False) + # Set on terminal-failure paths (``allow_delivery`` False, ``_nack`` exhausted, + # ``_reject``). ``_flush_terminal`` reads it to decide whether to build a DLQ + # payload. Stays ``None`` on the success (``_ack``) path so handler-success + # never touches the DLQ. + terminal_failure_reason: str | None = field(default=None, init=False) async def ack(self) -> None: await self._update_state_if_not_set(self._ack) @@ -94,12 +99,14 @@ async def _nack(self) -> None: ) if delay is None: self.to_delete = True + self.terminal_failure_reason = "retry_terminal" else: self.pending_delay_seconds = delay async def _reject(self) -> None: self._record_attempt() self.to_delete = True + self.terminal_failure_reason = "rejected" def _record_attempt(self) -> None: self.attempts_count += 1 @@ -113,6 +120,7 @@ def allow_delivery(self, *, max_deliveries: int | None, logger: "LoggerProto | N if max_deliveries is not None and self.deliveries_count > max_deliveries: self.to_delete = True self.state_set = True + self.terminal_failure_reason = "max_deliveries" if logger is not None: logger.log( logging.ERROR, diff --git a/faststream_outbox/schema.py b/faststream_outbox/schema.py index 2e7b8cc..c134fb8 100644 --- a/faststream_outbox/schema.py +++ b/faststream_outbox/schema.py @@ -105,3 +105,37 @@ def make_outbox_table(metadata: "MetaData", table_name: str = "outbox") -> Table postgresql_where=table.c.acquired_token.is_not(None), ) return table + + +def make_dlq_table(metadata: "MetaData", table_name: str = "outbox_dlq") -> Table: + """ + Build the dead-letter-queue ``Table`` and attach it to *metadata*. + + Opt-in companion to :func:`make_outbox_table`. Pass the returned table to + ``OutboxBroker(..., dlq_table=...)`` to enable archive-on-terminal-failure: the + broker copies ``payload`` / ``headers`` / failure context into this table in the + same Postgres statement as the outbox ``DELETE`` (atomic via CTE), so audit data + survives even if the worker crashes between the DELETE and a follow-up insert. + + No FK to the outbox table — the row is gone in the same transaction, so the + constraint would be unsatisfiable. ``original_id`` is a plain BigInteger for + operator forensics; not unique (re-delivered ``timer_id`` rows could legitimately + fail twice). No LISTEN/NOTIFY channel — nobody polls the DLQ, so the 63-byte + identifier check in :func:`make_outbox_table` does not apply here. + """ + table = Table( + table_name, + metadata, + Column("id", BigInteger, primary_key=True, autoincrement=True), + Column("original_id", BigInteger, nullable=False), + Column("queue", String(255), nullable=False), + Column("payload", LargeBinary, nullable=False), + Column("headers", JSONB, nullable=True), + Column("deliveries_count", BigInteger, nullable=False), + Column("created_at", DateTime(timezone=True), nullable=False), + Column("failed_at", DateTime(timezone=True), nullable=False, server_default=func.now()), + Column("failure_reason", String(32), nullable=False), + Column("last_exception", String, nullable=True), + ) + Index(f"{table_name}_queue_failed_idx", table.c.queue, table.c.failed_at) + return table diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index acef932..b034ea3 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -540,7 +540,22 @@ async def _flush_terminal( ) -> None: if row.acquired_token is None: return - deleted = await self._client.delete_with_lease(writer_conn, row.id, row.acquired_token) + # Build the DLQ payload only when this row is terminal-by-failure AND the + # broker is configured with a DLQ table. Success-by-ack rows reach this + # method too (terminal=True via to_delete) but carry + # ``terminal_failure_reason is None`` and must not land in the DLQ. + dlq_payload: dict[str, typing.Any] | None = None + if row.terminal_failure_reason is not None and self._outer_config.dlq_table is not None: + dlq_payload = { + "failure_reason": row.terminal_failure_reason, + "last_exception": repr(row.last_exception) if row.last_exception is not None else None, + } + deleted = await self._client.delete_with_lease( + writer_conn, + row.id, + row.acquired_token, + dlq_payload=dlq_payload, + ) if not deleted: self._log( log_level=logging.WARNING, @@ -562,6 +577,17 @@ async def _flush_terminal( "deliveries_count": row.deliveries_count, }, ) + return + if dlq_payload is not None: + self._emit_metric( + "dlq_written", + { + **self._base_tags(row.queue), + "deliveries_count": row.deliveries_count, + "failure_reason": row.terminal_failure_reason, + "exception_type": (type(row.last_exception).__name__ if row.last_exception is not None else None), + }, + ) async def _flush_retry( self, diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index 6245b79..dc8bf30 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -66,6 +66,10 @@ class FakeOutboxClient(AbstractOutboxClient): def __init__(self) -> None: self._rows: list[_FakeRow] = [] self._next_id = 1 + # Populated when ``delete_with_lease`` receives a ``dlq_payload``. Mirrors + # the real client's CTE side-effect: outbox row gone + DLQ row created in + # the same call. Tests assert against ``broker.fake_client.dlq_rows``. + self._dlq_rows: list[dict[str, typing.Any]] = [] def feed( self, @@ -96,6 +100,11 @@ def feed( def rows(self) -> list[_FakeRow]: return self._rows + @property + def dlq_rows(self) -> list[dict[str, typing.Any]]: + """Audit copies produced by ``delete_with_lease(..., dlq_payload=...)``.""" + return self._dlq_rows + @property def table(self) -> typing.Any: return None @@ -141,9 +150,27 @@ async def delete_with_lease( conn: typing.Any, # noqa: ARG002 message_id: int, acquired_token: uuid.UUID, + *, + dlq_payload: "typing.Mapping[str, typing.Any] | None" = None, ) -> bool: for i, row in enumerate(self._rows): if row.id == message_id and row.acquired_token == acquired_token: + if dlq_payload is not None: + # Mirror the real CTE side-effect: DLQ row materializes in the + # same call as the DELETE, before the row is removed. + self._dlq_rows.append( + { + "original_id": row.id, + "queue": row.queue, + "payload": row.payload, + "headers": row.headers, + "deliveries_count": row.deliveries_count, + "created_at": row.created_at, + "failed_at": _utcnow(), + "failure_reason": dlq_payload["failure_reason"], + "last_exception": dlq_payload["last_exception"], + }, + ) del self._rows[i] return True return False diff --git a/tests/conftest.py b/tests/conftest.py index 3bfc8c2..6feb704 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,7 +6,7 @@ from sqlalchemy import MetaData, Table from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine -from faststream_outbox import make_outbox_table +from faststream_outbox import make_dlq_table, make_outbox_table PG_DSN = os.environ.get("POSTGRES_DSN", "postgresql+asyncpg://outbox:outbox@localhost:5432/outbox") @@ -41,3 +41,16 @@ async def outbox_table(pg_engine: AsyncEngine) -> AsyncIterator[Table]: yield table async with pg_engine.begin() as conn: await conn.run_sync(metadata.drop_all) + + +@pytest.fixture +async def dlq_table(pg_engine: AsyncEngine) -> AsyncIterator[Table]: + """Per-test DLQ table on its own MetaData so it lives independently of the outbox fixture.""" + metadata = MetaData() + table_name = f"test_dlq_{uuid.uuid4().hex[:12]}" + table = make_dlq_table(metadata, table_name=table_name) + async with pg_engine.begin() as conn: + await conn.run_sync(metadata.create_all) + yield table + async with pg_engine.begin() as conn: + await conn.run_sync(metadata.drop_all) diff --git a/tests/test_fake.py b/tests/test_fake.py index 9ad29ab..998afd9 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -21,6 +21,7 @@ OutboxRouter, RetryStrategyProto, TestOutboxBroker, + make_dlq_table, make_outbox_table, ) from faststream_outbox.annotations import ( @@ -460,7 +461,7 @@ async def test_fake_broker_publish_invokes_flush_terminal_when_lease_lost() -> N """``delete_with_lease`` returning False is logged and skipped, not raised.""" class LeaseLostClient(FakeOutboxClient): - async def delete_with_lease(self, conn: object, message_id: int, acquired_token: uuid.UUID) -> bool: # noqa: ARG002 + async def delete_with_lease(self, *args: object, **kwargs: object) -> bool: # noqa: ARG002 return False broker = _make_broker() @@ -510,12 +511,19 @@ def __init__(self) -> None: super().__init__() self.calls = 0 - async def delete_with_lease(self, conn: object, message_id: int, acquired_token: uuid.UUID) -> bool: + async def delete_with_lease( + self, + conn: object, + message_id: int, + acquired_token: uuid.UUID, + *, + dlq_payload: "typing.Mapping[str, typing.Any] | None" = None, + ) -> bool: self.calls += 1 if self.calls == 1: msg = "delete blew up" raise RuntimeError(msg) - return await super().delete_with_lease(conn, message_id, acquired_token) + return await super().delete_with_lease(conn, message_id, acquired_token, dlq_payload=dlq_payload) broker = _make_broker() received: list[str] = [] @@ -1285,3 +1293,163 @@ async def handle( assert isinstance(captured["producer"], FakeOutboxProducer) assert isinstance(captured["client"], AbstractOutboxClient) assert captured["client"] is test_broker.fake_client + + +# --- DLQ end-to-end through TestOutboxBroker ----------------------------------- + + +def _make_broker_with_dlq( + *, + recorder: typing.Any = None, + max_deliveries: int | None = None, + retry_strategy: RetryStrategyProto | None = None, + ack_policy: AckPolicy | None = None, +) -> tuple[OutboxBroker, list[str], list[Exception]]: + """Build a broker with DLQ wired in. Returns (broker, received_payloads, raised).""" + metadata = MetaData() + table = make_outbox_table(metadata, table_name=f"dlq_test_{uuid.uuid4().hex[:8]}") + dlq = make_dlq_table(metadata, table_name=f"dlq_dlq_{uuid.uuid4().hex[:8]}") + kwargs: dict[str, typing.Any] = {"outbox_table": table, "dlq_table": dlq} + if recorder is not None: + kwargs["metrics_recorder"] = recorder + broker = OutboxBroker(**kwargs) + + received: list[str] = [] + raised: list[Exception] = [] + + sub_kwargs: dict[str, typing.Any] = {} + if max_deliveries is not None: + sub_kwargs["max_deliveries"] = max_deliveries + if retry_strategy is not None: + sub_kwargs["retry_strategy"] = retry_strategy + if ack_policy is not None: + sub_kwargs["ack_policy"] = ack_policy + + @broker.subscriber("orders", **sub_kwargs) + async def handle(body: str) -> None: + received.append(body) + if raised: + raise raised[0] + + return broker, received, raised + + +async def test_fake_dlq_captures_retry_terminal_failure() -> None: + broker, _received, raised = _make_broker_with_dlq(retry_strategy=NoRetry()) + raised.append(RuntimeError("boom")) + test_broker = TestOutboxBroker(broker) + + async with test_broker: + await broker.publish("audit-me", queue="orders") # ty: ignore[missing-argument] + + assert test_broker.fake_client.rows == [] + assert len(test_broker.fake_client.dlq_rows) == 1 + row = test_broker.fake_client.dlq_rows[0] + assert row["queue"] == "orders" + assert row["failure_reason"] == "retry_terminal" + assert "RuntimeError" in row["last_exception"] + assert row["payload"] # encoded body present + assert row["original_id"] is not None + + +async def test_fake_dlq_captures_rejected_failure() -> None: + """``REJECT_ON_ERROR`` ack policy + raising handler → DLQ row with reason ``rejected``.""" + with _warnings.catch_warnings(): + # REJECT_ON_ERROR + a retry strategy triggers a misconfig warning at registration + # (the retry strategy is ignored). Pass NoRetry() to match the policy and ignore + # the warning — the runtime semantics we care about (single attempt → DLQ) hold. + _warnings.simplefilter("ignore", UserWarning) + broker, _received, raised = _make_broker_with_dlq( + ack_policy=AckPolicy.REJECT_ON_ERROR, + retry_strategy=NoRetry(), + ) + raised.append(ValueError("poison message")) + test_broker = TestOutboxBroker(broker) + + async with test_broker: + await broker.publish("poison", queue="orders") # ty: ignore[missing-argument] + + assert test_broker.fake_client.rows == [] + assert len(test_broker.fake_client.dlq_rows) == 1 + row = test_broker.fake_client.dlq_rows[0] + assert row["failure_reason"] == "rejected" + assert "ValueError" in row["last_exception"] + + +async def test_fake_dlq_captures_max_deliveries_failure() -> None: + """A pre-seeded row over the max_deliveries cap routes through allow_delivery to DLQ.""" + broker, _received, _raised = _make_broker_with_dlq(max_deliveries=1) + test_broker = TestOutboxBroker(broker) + + async with test_broker: + # Seed the row directly with deliveries_count past the cap so dispatch_one's + # allow_delivery check fires immediately instead of running the handler. + test_broker.fake_client.feed(queue="orders", payload=b'"x"') + row = test_broker.fake_client.rows[0] + row.deliveries_count = 5 + row.acquired_token = uuid.uuid4() + row.acquired_at = _dt.datetime.now(tz=_dt.UTC) + sub = next(iter(broker._subscribers)) # noqa: SLF001 + from faststream_outbox.testing import _to_inner # noqa: PLC0415 + + await sub.dispatch_one(_to_inner(row), writer_conn=None) + + assert test_broker.fake_client.rows == [] + assert len(test_broker.fake_client.dlq_rows) == 1 + assert test_broker.fake_client.dlq_rows[0]["failure_reason"] == "max_deliveries" + + +async def test_fake_dlq_unconfigured_silently_deletes() -> None: + """Without ``dlq_table=...``, terminal failures DELETE and leave no audit trail.""" + broker = _make_broker() + + @broker.subscriber("orders", retry_strategy=NoRetry()) + async def handle(body: str) -> None: + del body + msg = "boom" + raise RuntimeError(msg) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + await broker.publish("x", queue="orders") # ty: ignore[missing-argument] + + assert test_broker.fake_client.rows == [] + assert test_broker.fake_client.dlq_rows == [] + + +async def test_fake_dlq_emits_dlq_written_metric() -> None: + events: list[tuple[str, dict]] = [] + + def recorder(event: str, tags: typing.Any) -> None: + events.append((event, dict(tags))) + + broker, _received, raised = _make_broker_with_dlq(retry_strategy=NoRetry(), recorder=recorder) + raised.append(RuntimeError("metric-boom")) + test_broker = TestOutboxBroker(broker) + + async with test_broker: + await broker.publish("x", queue="orders") # ty: ignore[missing-argument] + + dlq_events = [t for e, t in events if e == "dlq_written"] + assert len(dlq_events) == 1 + assert dlq_events[0]["failure_reason"] == "retry_terminal" + assert dlq_events[0]["exception_type"] == "RuntimeError" + assert dlq_events[0]["queue"] == "orders" + + +async def test_fake_dlq_not_emitted_on_handler_success() -> None: + events: list[tuple[str, dict]] = [] + + def recorder(event: str, tags: typing.Any) -> None: + events.append((event, dict(tags))) + + broker, _received, _raised = _make_broker_with_dlq(recorder=recorder) + test_broker = TestOutboxBroker(broker) + + async with test_broker: + await broker.publish("happy", queue="orders") # ty: ignore[missing-argument] + + assert not any(e == "dlq_written" for e, _ in events) + assert test_broker.fake_client.dlq_rows == [] + # And the row should be deleted (handler succeeded). + assert test_broker.fake_client.rows == [] diff --git a/tests/test_integration.py b/tests/test_integration.py index e0b1c2e..5f16970 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -12,7 +12,9 @@ from faststream_outbox import ( ConstantRetry, + NoRetry, OutboxBroker, + make_dlq_table, make_outbox_table, ) from faststream_outbox.client import OutboxClient @@ -1219,3 +1221,130 @@ async def handle(body: dict) -> None: # Three 0.3s handlers in parallel ~ 0.3s + slack. Sequential would be >= 0.9s. # 0.7s upper guard catches a regression to sequential broker.stop. assert elapsed < 0.7, f"broker.stop() took {elapsed:.3f}s — looks sequential" + + +# --- DLQ (issue #26) ----------------------------------------------------------- + + +async def _dlq_rows(engine: AsyncEngine, table: Table) -> list[dict]: + async with engine.connect() as conn: + result = await conn.execute(select(table).order_by(table.c.id)) + return [dict(row) for row in result.mappings().all()] + + +async def test_validate_schema_passes_for_correct_dlq_table( + pg_engine: AsyncEngine, + outbox_table: Table, + dlq_table: Table, +) -> None: + client = OutboxClient(pg_engine, outbox_table, dlq_table=dlq_table) + await client.validate_schema() + + +async def test_validate_schema_fails_for_missing_dlq_table( + pg_engine: AsyncEngine, + outbox_table: Table, +) -> None: + """When ``dlq_table`` is configured, validate_schema also checks the DLQ table.""" + metadata = MetaData() + missing_dlq = make_dlq_table(metadata, table_name="does_not_exist_dlq_xyz") + client = OutboxClient(pg_engine, outbox_table, dlq_table=missing_dlq) + with pytest.raises(RuntimeError, match="does_not_exist_dlq_xyz"): + await client.validate_schema() + + +async def test_dlq_atomic_insert_with_delete( + pg_engine: AsyncEngine, + outbox_table: Table, + dlq_table: Table, +) -> None: + """End-to-end: a terminal-failure handler triggers DELETE+INSERT in one CTE.""" + broker = OutboxBroker(pg_engine, outbox_table=outbox_table, dlq_table=dlq_table) + handled = asyncio.Event() + + @broker.subscriber("orders", retry_strategy=NoRetry(), min_fetch_interval=0.02) + async def handle(body: dict) -> None: + del body + handled.set() + msg = "always fails" + raise RuntimeError(msg) + + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with broker: + async with session_factory() as session, session.begin(): + await broker.publish({"audit": True}, queue="orders", session=session, correlation_id="trace-dlq") + await asyncio.wait_for(handled.wait(), timeout=5.0) + # Exit ``async with broker`` triggers stop() → graceful drain, so the + # worker's terminal flush (the CTE) has fully committed before we assert. + + assert await _row_count(pg_engine, outbox_table) == 0 + rows = await _dlq_rows(pg_engine, dlq_table) + assert len(rows) == 1 + row = rows[0] + assert row["queue"] == "orders" + assert row["failure_reason"] == "retry_terminal" + assert row["last_exception"] is not None + assert "RuntimeError" in row["last_exception"] + assert row["headers"] is not None + assert row["headers"].get("correlation_id") == "trace-dlq" + assert row["original_id"] is not None + assert row["payload"] is not None + + +async def test_dlq_insert_failure_rolls_back_delete( + pg_engine: AsyncEngine, + outbox_table: Table, +) -> None: + """A DLQ-write failure rolls back the whole CTE — the outbox row stays leased.""" + # Configure the broker with a DLQ table that doesn't actually exist in the DB + # (we create the canonical schema but skip create_all so the INSERT fails). + metadata = MetaData() + missing_dlq = make_dlq_table(metadata, table_name=f"does_not_exist_dlq_{uuid.uuid4().hex[:8]}") + client = OutboxClient(pg_engine, outbox_table, dlq_table=missing_dlq) + + # Seed an outbox row and acquire its lease so we can drive ``delete_with_lease``. + async with pg_engine.begin() as conn: + await conn.execute(insert(outbox_table).values(queue="orders", payload=b"poison")) + async with pg_engine.connect() as fetch_conn: + rows = await client.fetch(fetch_conn, ["orders"], limit=1, lease_ttl_seconds=60.0) + assert len(rows) == 1 + row = rows[0] + + async with pg_engine.connect() as raw_conn: + writer_conn = await raw_conn.execution_options(isolation_level="AUTOCOMMIT") + with pytest.raises(Exception, match="does_not_exist_dlq"): + await client.delete_with_lease( + writer_conn, + row.id, + row.acquired_token, # ty: ignore[invalid-argument-type] + dlq_payload={"failure_reason": "retry_terminal", "last_exception": "RuntimeError('x')"}, + ) + + # Outbox row still present — the CTE was atomic, INSERT failure rolled back DELETE. + assert await _row_count(pg_engine, outbox_table) == 1 + + +async def test_dlq_cte_returns_false_when_lease_already_lost( + pg_engine: AsyncEngine, + outbox_table: Table, + dlq_table: Table, +) -> None: + """When the lease was reclaimed by another worker, the CTE deletes nothing AND writes no DLQ row.""" + client = OutboxClient(pg_engine, outbox_table, dlq_table=dlq_table) + async with pg_engine.begin() as conn: + await conn.execute(insert(outbox_table).values(queue="orders", payload=b"x")) + async with pg_engine.connect() as fetch_conn: + rows = await client.fetch(fetch_conn, ["orders"], limit=1, lease_ttl_seconds=60.0) + + async with pg_engine.connect() as raw_conn: + writer_conn = await raw_conn.execution_options(isolation_level="AUTOCOMMIT") + # Wrong token = lease lost. + deleted = await client.delete_with_lease( + writer_conn, + rows[0].id, + uuid.uuid4(), + dlq_payload={"failure_reason": "retry_terminal", "last_exception": "RuntimeError('x')"}, + ) + assert deleted is False + assert await _row_count(pg_engine, outbox_table) == 1 # outbox row still leased + assert await _row_count(pg_engine, dlq_table) == 0 # no DLQ row from a no-op CTE diff --git a/tests/test_unit.py b/tests/test_unit.py index 84bcbe2..cff3724 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -27,6 +27,7 @@ OutboxPublisher, OutboxRouter, TestOutboxBroker, + make_dlq_table, make_outbox_table, ) from faststream_outbox.client import OutboxClient, _validate_schema_sync @@ -1547,7 +1548,7 @@ async def test_dispatch_one_propagates_flush_error_when_writer_conn_set() -> Non """A flush error against a cached writer conn must propagate so _worker_loop can reconnect.""" class RaisingFake(FakeOutboxClient): - async def delete_with_lease(self, conn: object, message_id: int, acquired_token: uuid.UUID) -> bool: # noqa: ARG002 + async def delete_with_lease(self, *args: object, **kwargs: object) -> bool: # noqa: ARG002 msg = "writer conn poisoned" raise RuntimeError(msg) @@ -1564,7 +1565,7 @@ async def test_dispatch_one_swallows_flush_error_when_writer_conn_none() -> None """Legacy behavior: without a writer_conn, a raising delete is logged and swallowed.""" class RaisingFake(FakeOutboxClient): - async def delete_with_lease(self, conn: object, message_id: int, acquired_token: uuid.UUID) -> bool: # noqa: ARG002 + async def delete_with_lease(self, *args: object, **kwargs: object) -> bool: # noqa: ARG002 msg = "delete blew up" raise RuntimeError(msg) @@ -2415,3 +2416,243 @@ def raising_recorder(event: str, tags: typing.Any) -> None: # noqa: ARG001 assert row_id == 99 matching = [r for r in caplog.records if "metrics recorder raised" in r.getMessage() and r.exc_info is not None] assert matching, "expected DEBUG log 'metrics recorder raised' with exc_info" + + +# --- make_dlq_table ----------------------------------------------------------- + + +def test_make_dlq_table_columns_present() -> None: + metadata = MetaData() + t = make_dlq_table(metadata, table_name="my_dlq") + expected = { + "id", + "original_id", + "queue", + "payload", + "headers", + "deliveries_count", + "created_at", + "failed_at", + "failure_reason", + "last_exception", + } + assert {c.name for c in t.columns} == expected + assert t.name == "my_dlq" + + +def test_make_dlq_table_declares_queue_failed_index() -> None: + metadata = MetaData() + t = make_dlq_table(metadata, table_name="my_dlq") + idx = next(idx for idx in t.indexes if idx.name == "my_dlq_queue_failed_idx") + assert idx.unique is False + assert [c.name for c in idx.columns] == ["queue", "failed_at"] + + +def test_make_dlq_table_attaches_to_metadata() -> None: + metadata = MetaData() + make_dlq_table(metadata, table_name="audit_dlq") + assert "audit_dlq" in metadata.tables + + +def test_make_dlq_table_accepts_long_name_no_notify_check() -> None: + """Unlike ``make_outbox_table``, the DLQ has no NOTIFY channel — long names are allowed.""" + metadata = MetaData() + long_name = "x" * 80 # would exceed 63 bytes if a NOTIFY channel were derived + t = make_dlq_table(metadata, table_name=long_name) + assert t.name == long_name + + +# --- terminal_failure_reason wiring on OutboxInnerMessage --------------------- + + +async def test_terminal_failure_reason_set_on_max_deliveries() -> None: + msg = _make_msg(deliveries_count=10) + assert msg.allow_delivery(max_deliveries=5, logger=None) is False + assert msg.terminal_failure_reason == "max_deliveries" + + +async def test_terminal_failure_reason_set_on_retry_terminal_without_strategy() -> None: + msg = _make_msg() + await msg.nack() + assert msg.to_delete + assert msg.terminal_failure_reason == "retry_terminal" + + +async def test_terminal_failure_reason_set_on_retry_strategy_returning_none() -> None: + msg = _make_msg(retry_strategy=NoRetry()) + await msg.nack() + assert msg.terminal_failure_reason == "retry_terminal" + + +async def test_terminal_failure_reason_unset_when_retry_scheduled() -> None: + msg = _make_msg(retry_strategy=ConstantRetry(delay_seconds=60)) + await msg.nack() + assert msg.pending_delay_seconds == 60.0 + assert msg.terminal_failure_reason is None + + +async def test_terminal_failure_reason_set_on_reject() -> None: + msg = _make_msg() + await msg.reject() + assert msg.to_delete + assert msg.terminal_failure_reason == "rejected" + + +async def test_terminal_failure_reason_unset_on_ack() -> None: + msg = _make_msg() + await msg.ack() + assert msg.to_delete + assert msg.terminal_failure_reason is None + + +# --- _flush_terminal DLQ wiring ----------------------------------------------- + + +def _make_broker_with_dlq(recorder: typing.Any = None) -> tuple[OutboxBroker, TestOutboxBroker]: + metadata = MetaData() + table = make_outbox_table(metadata, table_name="outbox") + dlq = make_dlq_table(metadata, table_name="outbox_dlq") + kwargs: dict[str, typing.Any] = {"outbox_table": table, "dlq_table": dlq} + if recorder is not None: + kwargs["metrics_recorder"] = recorder + broker = OutboxBroker(**kwargs) + + @broker.subscriber("orders", max_deliveries=1) + async def handle(body: dict) -> None: ... + + test_broker = TestOutboxBroker(broker) + return broker, test_broker + + +async def test_flush_terminal_builds_dlq_payload_when_failure_reason_set() -> None: + """A terminal failure with ``dlq_table`` configured threads dlq_payload through.""" + broker, test_broker = _make_broker_with_dlq() + fake = FakeOutboxClient() + test_broker.fake_client = fake + msg = _make_msg(id=7, queue="orders", deliveries_count=3) + msg.terminal_failure_reason = "max_deliveries" + msg.last_exception = RuntimeError("boom") + + with patch.object(fake, "delete_with_lease", new=AsyncMock(return_value=True)) as spy: + async with test_broker: + sub = next(iter(broker._subscribers)) # noqa: SLF001 + await sub._flush_terminal(msg, writer_conn=None) # noqa: SLF001 + + spy.assert_awaited_once() + assert spy.await_args is not None + kwargs = spy.await_args.kwargs + assert kwargs["dlq_payload"]["failure_reason"] == "max_deliveries" + assert "RuntimeError" in kwargs["dlq_payload"]["last_exception"] + + +async def test_flush_terminal_no_dlq_payload_on_ack_path() -> None: + """Success-by-ack reaches _flush_terminal too (via to_delete) but reason stays None → no DLQ.""" + broker, test_broker = _make_broker_with_dlq() + fake = FakeOutboxClient() + test_broker.fake_client = fake + msg = _make_msg(id=8, queue="orders") + # terminal_failure_reason stays None (handler succeeded). + + with patch.object(fake, "delete_with_lease", new=AsyncMock(return_value=True)) as spy: + async with test_broker: + sub = next(iter(broker._subscribers)) # noqa: SLF001 + await sub._flush_terminal(msg, writer_conn=None) # noqa: SLF001 + + spy.assert_awaited_once() + assert spy.await_args is not None + assert spy.await_args.kwargs["dlq_payload"] is None + + +async def test_flush_terminal_no_dlq_payload_when_dlq_unconfigured() -> None: + """Broker without ``dlq_table`` never builds dlq_payload, even on terminal failure.""" + fake = FakeOutboxClient() + broker, test_broker = _make_broker_for_dispatch(fake) + msg = _make_msg(id=9, queue="orders", deliveries_count=3) + msg.terminal_failure_reason = "max_deliveries" + + with patch.object(fake, "delete_with_lease", new=AsyncMock(return_value=True)) as spy: + async with test_broker: + sub = next(iter(broker._subscribers)) # noqa: SLF001 + await sub._flush_terminal(msg, writer_conn=None) # noqa: SLF001 + + spy.assert_awaited_once() + assert spy.await_args is not None + assert spy.await_args.kwargs["dlq_payload"] is None + + +async def test_flush_terminal_emits_dlq_written_metric_after_successful_delete() -> None: + events, recorder = _events_recorder() + broker, test_broker = _make_broker_with_dlq(recorder) + fake = FakeOutboxClient() + test_broker.fake_client = fake + msg = _make_msg(id=10, queue="orders", deliveries_count=3) + msg.terminal_failure_reason = "retry_terminal" + msg.last_exception = RuntimeError("boom") + + with patch.object(fake, "delete_with_lease", new=AsyncMock(return_value=True)): + async with test_broker: + sub = next(iter(broker._subscribers)) # noqa: SLF001 + await sub._flush_terminal(msg, writer_conn=None) # noqa: SLF001 + + dlq_events = [t for e, t in events if e == "dlq_written"] + assert len(dlq_events) == 1 + assert dlq_events[0]["failure_reason"] == "retry_terminal" + assert dlq_events[0]["exception_type"] == "RuntimeError" + assert dlq_events[0]["queue"] == "orders" + + +async def test_flush_terminal_does_not_emit_dlq_written_on_lease_lost() -> None: + """Lease-lost path (delete returned False) must skip the dlq_written emission.""" + events, recorder = _events_recorder() + broker, test_broker = _make_broker_with_dlq(recorder) + fake = FakeOutboxClient() + test_broker.fake_client = fake + msg = _make_msg(id=11, queue="orders", deliveries_count=3) + msg.terminal_failure_reason = "max_deliveries" + + with patch.object(fake, "delete_with_lease", new=AsyncMock(return_value=False)): + async with test_broker: + sub = next(iter(broker._subscribers)) # noqa: SLF001 + await sub._flush_terminal(msg, writer_conn=None) # noqa: SLF001 + + assert not any(e == "dlq_written" for e, _ in events) + assert [e for e, _ in events if e == "lease_lost"] + + +async def test_flush_terminal_dlq_payload_includes_repr_of_exception() -> None: + """``last_exception`` is serialized via ``repr()`` — compact, type-aware.""" + broker, test_broker = _make_broker_with_dlq() + fake = FakeOutboxClient() + test_broker.fake_client = fake + msg = _make_msg(id=12, queue="orders") + msg.terminal_failure_reason = "rejected" + msg.last_exception = ValueError("invalid payload") + + with patch.object(fake, "delete_with_lease", new=AsyncMock(return_value=True)) as spy: + async with test_broker: + sub = next(iter(broker._subscribers)) # noqa: SLF001 + await sub._flush_terminal(msg, writer_conn=None) # noqa: SLF001 + + assert spy.await_args is not None + payload = spy.await_args.kwargs["dlq_payload"] + assert payload["last_exception"] == repr(ValueError("invalid payload")) + + +async def test_flush_terminal_dlq_payload_last_exception_none_when_no_exc() -> None: + """Manual ``reject()`` without an exception → DLQ row with ``last_exception=None``.""" + broker, test_broker = _make_broker_with_dlq() + fake = FakeOutboxClient() + test_broker.fake_client = fake + msg = _make_msg(id=13, queue="orders") + msg.terminal_failure_reason = "rejected" + # last_exception is None — operator chose to drop, no exception context. + + with patch.object(fake, "delete_with_lease", new=AsyncMock(return_value=True)) as spy: + async with test_broker: + sub = next(iter(broker._subscribers)) # noqa: SLF001 + await sub._flush_terminal(msg, writer_conn=None) # noqa: SLF001 + + assert spy.await_args is not None + payload = spy.await_args.kwargs["dlq_payload"] + assert payload["failure_reason"] == "rejected" + assert payload["last_exception"] is None From a03af4d52d47991c8300109e8fa9510e3c08f153 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Sat, 30 May 2026 11:00:55 +0300 Subject: [PATCH 2/3] refactor: DLQ review followups (metric branching, exception truncation, bundled adapters) Address review findings on the opt-in DLQ feature: - nacked_terminal metric now branches on terminal_failure_reason before last_exception, fixing manual msg.reject() being mis-tagged as "acked" and REJECT_ON_ERROR exceptions being mis-tagged as reason="retry_terminal". - dlq_written event surfaced in both bundled adapters (faststream_outbox_dlq_written_total in Prometheus, messaging.outbox.dlq_written in OpenTelemetry) and added to the vocabulary docstring. - last_exception repr() bounded at 8 KiB via _LAST_EXCEPTION_MAX_CHARS so pathological exceptions (MB-scale validation errors, asyncpg DataError) can't stall the writer round-trip. - DLQ failure_reason column bumped from String(32) to String(64); added DLQFailureReason Literal type as the public contract for the reason set. - Hoisted the inline _to_inner import in tests/test_fake.py to module top. Co-Authored-By: Claude Opus 4.7 (1M context) --- faststream_outbox/message.py | 14 ++- faststream_outbox/metrics/__init__.py | 14 ++- faststream_outbox/metrics/opentelemetry.py | 18 ++++ faststream_outbox/metrics/prometheus.py | 17 +++- faststream_outbox/schema.py | 4 +- faststream_outbox/subscriber/usecase.py | 58 +++++++---- tests/test_fake.py | 4 +- tests/test_metrics_opentelemetry.py | 24 ++++- tests/test_metrics_prometheus.py | 25 ++++- tests/test_unit.py | 113 ++++++++++++++++++++- 10 files changed, 257 insertions(+), 34 deletions(-) diff --git a/faststream_outbox/message.py b/faststream_outbox/message.py index 1ab87dd..edbdfe8 100644 --- a/faststream_outbox/message.py +++ b/faststream_outbox/message.py @@ -14,7 +14,7 @@ import uuid from collections.abc import Awaitable, Callable from dataclasses import dataclass, field -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal from faststream.message.message import StreamMessage @@ -25,6 +25,13 @@ from faststream_outbox.retry import RetryStrategyProto +# Public contract for the DLQ ``failure_reason`` column and the ``reason`` tag on +# ``nacked_terminal`` / ``dlq_written`` metric events. Operators query against these +# literals (and dashboards key labels off them) — adding a new value is a public +# API change. The DLQ column is sized to accommodate growth; see ``schema.py``. +DLQFailureReason = Literal["max_deliveries", "retry_terminal", "rejected"] + + def _utcnow() -> _dt.datetime: return _dt.datetime.now(tz=_dt.UTC) @@ -62,9 +69,10 @@ class OutboxInnerMessage: pending_delay_seconds: float | None = field(default=None, init=False) # Set on terminal-failure paths (``allow_delivery`` False, ``_nack`` exhausted, # ``_reject``). ``_flush_terminal`` reads it to decide whether to build a DLQ - # payload. Stays ``None`` on the success (``_ack``) path so handler-success + # payload; ``dispatch_one`` reads it to pick the ``nacked_terminal`` reason + # tag. Stays ``None`` on the success (``_ack``) path so handler-success # never touches the DLQ. - terminal_failure_reason: str | None = field(default=None, init=False) + terminal_failure_reason: "DLQFailureReason | None" = field(default=None, init=False) async def ack(self) -> None: await self._update_state_if_not_set(self._ack) diff --git a/faststream_outbox/metrics/__init__.py b/faststream_outbox/metrics/__init__.py index 3b340cb..de7bf19 100644 --- a/faststream_outbox/metrics/__init__.py +++ b/faststream_outbox/metrics/__init__.py @@ -15,9 +15,19 @@ * ``acked`` — handler returned cleanly. Tags include ``duration_seconds``. * ``nacked_retried`` — handler raised, retry scheduled. Tags include ``duration_seconds, next_delay_seconds, exception_type``. -* ``nacked_terminal`` — terminal failure. Tags include ``reason`` (``max_deliveries`` | ``retry_terminal``); - ``duration_seconds`` is present only for ``retry_terminal``. +* ``nacked_terminal`` — terminal failure. Tags include ``reason`` (``max_deliveries`` | + ``retry_terminal`` | ``rejected``). ``duration_seconds`` is present for the post-handler + reasons (``retry_terminal``, ``rejected``) and absent for ``max_deliveries`` (no handler + ran). ``exception_type`` is present when ``last_exception`` was set (post-handler raises; + manual ``msg.reject()`` may omit it). * ``lease_lost`` — terminal flush found a foreign lease. Tags include ``phase`` (``terminal`` | ``retry``). +* ``dlq_written`` — emitted from ``_flush_terminal`` after the DELETE+INSERT CTE commits. + Fires only when ``OutboxBroker`` was constructed with ``dlq_table=...`` AND the row was + terminal-by-failure (any ``nacked_terminal`` reason). Skipped on lease-lost. Tags: + ``queue, subscriber, deliveries_count, failure_reason`` (same value set as + ``nacked_terminal``'s ``reason`` tag), and ``exception_type`` when ``last_exception`` was + set. Pair with ``nacked_terminal`` to alert on "row failed but audit didn't land" + (``nacked_terminal`` rate > ``dlq_written`` rate). * ``published`` — producer-side insert. Tags include ``status`` (``success`` | ``error``), ``count, size_bytes, duration_seconds``. No ``subscriber`` tag. ``count`` is **messages landed**, not publish attempts — errors and ``timer_id`` diff --git a/faststream_outbox/metrics/opentelemetry.py b/faststream_outbox/metrics/opentelemetry.py index 9faa711..70e5de9 100644 --- a/faststream_outbox/metrics/opentelemetry.py +++ b/faststream_outbox/metrics/opentelemetry.py @@ -65,6 +65,7 @@ _ATTR_STATUS = "messaging.outbox.status" # acked | nacked | error _ATTR_TERMINAL_REASON = "messaging.outbox.terminal_reason" _ATTR_LEASE_PHASE = "messaging.outbox.lease_phase" +_ATTR_DLQ_REASON = "messaging.outbox.dlq_reason" class OpenTelemetryRecorder: @@ -140,6 +141,14 @@ def __init__( unit="event", description="Lease-token mismatches on terminal write", ) + # Pairs with the ``nacked_terminal`` event so dashboards can compare + # "row failed terminally" against "audit landed" to detect DLQ + # misconfiguration (schema mismatch, etc.) without silent data loss. + self._dlq_written = self._meter.create_counter( + name="messaging.outbox.dlq_written", + unit="event", + description="DLQ audit rows written by terminal flush, broken down by reason", + ) def _attrs(self, tags: Mapping[str, typing.Any], *, operation: str) -> dict[str, typing.Any]: attrs: dict[str, typing.Any] = { @@ -186,6 +195,15 @@ def __call__(self, event: str, tags: Mapping[str, typing.Any]) -> None: # noqa: self._lease_lost.add(1, attrs) return + if event == "dlq_written": + attrs = self._attrs(tags, operation="process") + attrs[_ATTR_DLQ_REASON] = tags["failure_reason"] + exc = tags.get("exception_type") + if exc is not None: + attrs[_ATTR_ERROR_TYPE] = exc + self._dlq_written.add(1, attrs) + return + if event == "published": attrs = self._attrs(tags, operation="publish") attrs[_ATTR_STATUS] = tags.get("status", "success") diff --git a/faststream_outbox/metrics/prometheus.py b/faststream_outbox/metrics/prometheus.py index 90360a3..eab02f6 100644 --- a/faststream_outbox/metrics/prometheus.py +++ b/faststream_outbox/metrics/prometheus.py @@ -203,6 +203,15 @@ def __init__( [*consume_labels, "phase"], registry=registry, ) + # Pairs with ``_terminal_reason`` so dashboards can compare "row failed + # terminally" vs "audit landed" — divergence signals DLQ misconfiguration + # (schema mismatch, etc.) without silent data loss. + self._dlq_written = Counter( + f"{p}_outbox_dlq_written_total", + "DLQ audit rows written by terminal flush, broken down by reason.", + [*consume_labels, "reason"], + registry=registry, + ) def _resolve_custom_values(self, tags: Mapping[str, typing.Any]) -> tuple[str, ...]: return tuple( @@ -225,7 +234,7 @@ def _publish_values(self, tags: Mapping[str, typing.Any]) -> tuple[str, ...]: destination = tags.get("queue", "") return (self._app_name, BROKER_SYSTEM, destination, *self._resolve_custom_values(tags)) - def __call__(self, event: str, tags: Mapping[str, typing.Any]) -> None: # noqa: C901 + def __call__(self, event: str, tags: Mapping[str, typing.Any]) -> None: # noqa: C901, PLR0912 consume_base = self._consume_values(tags) if event == "fetched": @@ -264,6 +273,10 @@ def __call__(self, event: str, tags: Mapping[str, typing.Any]) -> None: # noqa: self._lease_lost.labels(*consume_base, tags["phase"]).inc() return + if event == "dlq_written": + self._dlq_written.labels(*consume_base, tags["failure_reason"]).inc() + return + if event == "published": publish_base = self._publish_values(tags) status = tags.get("status", "success") @@ -283,7 +296,7 @@ def __call__(self, event: str, tags: Mapping[str, typing.Any]) -> None: # noqa: if exc is not None: self._published_exceptions.labels(*publish_base, exc).inc() return - # Unknown event — silently ignored so future events (e.g. ``dlq_written``) + # Unknown event — silently ignored so future event vocabulary additions # don't break old recorders. diff --git a/faststream_outbox/schema.py b/faststream_outbox/schema.py index c134fb8..ea1f6f6 100644 --- a/faststream_outbox/schema.py +++ b/faststream_outbox/schema.py @@ -134,7 +134,9 @@ def make_dlq_table(metadata: "MetaData", table_name: str = "outbox_dlq") -> Tabl Column("deliveries_count", BigInteger, nullable=False), Column("created_at", DateTime(timezone=True), nullable=False), Column("failed_at", DateTime(timezone=True), nullable=False, server_default=func.now()), - Column("failure_reason", String(32), nullable=False), + # 64 gives breathing room past the current 14-byte max ("retry_terminal") so the + # canonical set can grow without a migration that rewrites every audit row. + Column("failure_reason", String(64), nullable=False), Column("last_exception", String, nullable=True), ) Index(f"{table_name}_queue_failed_idx", table.c.queue, table.c.failed_at) diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index b034ea3..403c3f3 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -50,6 +50,15 @@ _BACKOFF_EXP_CAP = 30 _BACKOFF_MAX_SECONDS = 30.0 +# Cap for the ``last_exception`` string written to the DLQ. Some exceptions carry +# huge payloads (validation errors with the full request body, asyncpg ``DataError`` +# with the rejected row, etc.). An unbounded ``repr`` would extend the writer +# round-trip on a poison row by hundreds of ms and bloat the DLQ table. 8 KiB is +# generous enough to keep tracebacks and structured detail intact while bounding +# worst-case write cost. Truncation appends ``…[truncated]``. +_LAST_EXCEPTION_MAX_CHARS = 8192 +_TRUNCATION_SUFFIX = "…[truncated]" + _UNSUPPORTED_PEEK_MSG = ( "OutboxBroker does not support get_one() / async iteration. " "Use `broker.fetch_unprocessed(session=..., queue=...)` for lease-free read access." @@ -71,6 +80,17 @@ def _compute_backoff(attempt: int, ceiling: float, *, base: float = 1.0) -> floa return min(base * (2.0 ** (attempt - 1)) * random.uniform(0.5, 1.5), ceiling) # noqa: S311 +def _truncate_exception(exc: BaseException | None) -> str | None: + """Render *exc* via ``repr`` and bound it to ``_LAST_EXCEPTION_MAX_CHARS``.""" + if exc is None: + return None + rendered = repr(exc) + if len(rendered) <= _LAST_EXCEPTION_MAX_CHARS: + return rendered + keep = _LAST_EXCEPTION_MAX_CHARS - len(_TRUNCATION_SUFFIX) + return rendered[:keep] + _TRUNCATION_SUFFIX + + if typing.TYPE_CHECKING: from faststream._internal.endpoint.publisher import PublisherProto from faststream._internal.endpoint.subscriber.call_item import CallsCollection @@ -485,26 +505,24 @@ async def dispatch_one( await row.assert_state_set(logger) duration_seconds = time.perf_counter() - start_perf common = {**base, "deliveries_count": row.deliveries_count, "duration_seconds": duration_seconds} - if row.last_exception is None: - self._emit_metric("acked", common) + # Branch on ``terminal_failure_reason`` (set by ``_nack``/``_reject``) before + # ``last_exception`` so manual ``msg.reject()`` (no exception raised) still + # surfaces as ``nacked_terminal(reason="rejected")``, and ack-policy-driven + # rejects (REJECT_ON_ERROR) carry ``reason="rejected"`` rather than the + # incorrect ``"retry_terminal"`` they got under the old ``last_exception``-first + # ordering. Successful handlers leave ``terminal_failure_reason=None``. + if row.terminal_failure_reason is not None: + terminal_tags: dict[str, typing.Any] = {**common, "reason": row.terminal_failure_reason} + if row.last_exception is not None: + terminal_tags["exception_type"] = type(row.last_exception).__name__ + self._emit_metric("nacked_terminal", terminal_tags) elif row.pending_delay_seconds is not None: - self._emit_metric( - "nacked_retried", - { - **common, - "next_delay_seconds": row.pending_delay_seconds, - "exception_type": type(row.last_exception).__name__, - }, - ) - elif row.to_delete: - self._emit_metric( - "nacked_terminal", - { - **common, - "reason": "retry_terminal", - "exception_type": type(row.last_exception).__name__, - }, - ) + retry_tags: dict[str, typing.Any] = {**common, "next_delay_seconds": row.pending_delay_seconds} + if row.last_exception is not None: + retry_tags["exception_type"] = type(row.last_exception).__name__ + self._emit_metric("nacked_retried", retry_tags) + else: + self._emit_metric("acked", common) await self._safe_flush(row, terminal=row.to_delete, writer_conn=writer_conn) async def _safe_flush( @@ -548,7 +566,7 @@ async def _flush_terminal( if row.terminal_failure_reason is not None and self._outer_config.dlq_table is not None: dlq_payload = { "failure_reason": row.terminal_failure_reason, - "last_exception": repr(row.last_exception) if row.last_exception is not None else None, + "last_exception": _truncate_exception(row.last_exception), } deleted = await self._client.delete_with_lease( writer_conn, diff --git a/tests/test_fake.py b/tests/test_fake.py index 998afd9..85aa0af 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -41,7 +41,7 @@ from faststream_outbox.envelope import _encode_payload as encode_payload from faststream_outbox.router import OutboxRoute from faststream_outbox.subscriber.config import OutboxSubscriberConfig -from faststream_outbox.testing import FakeOutboxClient, FakeOutboxProducer, _FakeRow +from faststream_outbox.testing import FakeOutboxClient, FakeOutboxProducer, _FakeRow, _to_inner def _fake_session() -> AsyncMock: @@ -1390,8 +1390,6 @@ async def test_fake_dlq_captures_max_deliveries_failure() -> None: row.acquired_token = uuid.uuid4() row.acquired_at = _dt.datetime.now(tz=_dt.UTC) sub = next(iter(broker._subscribers)) # noqa: SLF001 - from faststream_outbox.testing import _to_inner # noqa: PLC0415 - await sub.dispatch_one(_to_inner(row), writer_conn=None) assert test_broker.fake_client.rows == [] diff --git a/tests/test_metrics_opentelemetry.py b/tests/test_metrics_opentelemetry.py index ee7ccbd..d1abda0 100644 --- a/tests/test_metrics_opentelemetry.py +++ b/tests/test_metrics_opentelemetry.py @@ -117,10 +117,32 @@ def test_otel_published_records_publish_duration() -> None: def test_otel_unknown_event_is_silently_ignored() -> None: reader, rec = _reader_and_recorder() - rec("dlq_written", {"queue": "q", "subscriber": "h"}) # forward-compat + rec("future_event_not_yet_added", {"queue": "q", "subscriber": "h"}) # forward-compat assert _collect_metrics(reader) == {} +def test_otel_dlq_written_emits_counter_with_reason_attr() -> None: + reader, rec = _reader_and_recorder() + rec( + "dlq_written", + { + "queue": "q", + "subscriber": "h", + "deliveries_count": 3, + "failure_reason": "retry_terminal", + "exception_type": "RuntimeError", + }, + ) + metrics = _collect_metrics(reader) + assert "messaging.outbox.dlq_written" in metrics + counter_points = metrics["messaging.outbox.dlq_written"].data_points + assert sum(p.value for p in counter_points) == 1 + attrs = counter_points[0].attributes + assert attrs is not None + assert attrs["messaging.outbox.dlq_reason"] == "retry_terminal" + assert attrs["error.type"] == "RuntimeError" + + def test_otel_meter_argument_takes_precedence_over_meter_provider() -> None: reader = InMemoryMetricReader() provider = MeterProvider(metric_readers=[reader]) diff --git a/tests/test_metrics_prometheus.py b/tests/test_metrics_prometheus.py index 5d0d0bd..4ec182a 100644 --- a/tests/test_metrics_prometheus.py +++ b/tests/test_metrics_prometheus.py @@ -161,7 +161,30 @@ def test_prometheus_published_exception_uses_destination_label() -> None: def test_prometheus_unknown_event_is_silently_ignored() -> None: _, rec = _make_recorder() - rec("dlq_written", {"queue": "q", "subscriber": "h"}) # forward-compat + rec("future_event_not_yet_added", {"queue": "q", "subscriber": "h"}) # forward-compat + + +def test_prometheus_dlq_written_records_reason_label() -> None: + reg, rec = _make_recorder() + rec( + "dlq_written", + { + "queue": "q", + "subscriber": "h", + "deliveries_count": 3, + "failure_reason": "retry_terminal", + "exception_type": "RuntimeError", + }, + ) + assert _sample(reg, "faststream_outbox_dlq_written_total", {**_base_labels(), "reason": "retry_terminal"}) == 1.0 + + +def test_prometheus_dlq_written_supports_all_failure_reasons() -> None: + reg, rec = _make_recorder() + for reason in ("max_deliveries", "retry_terminal", "rejected"): + rec("dlq_written", {"queue": "q", "subscriber": "h", "failure_reason": reason}) + for reason in ("max_deliveries", "retry_terminal", "rejected"): + assert _sample(reg, "faststream_outbox_dlq_written_total", {**_base_labels(), "reason": reason}) == 1.0 def test_prometheus_app_name_label_is_applied() -> None: diff --git a/tests/test_unit.py b/tests/test_unit.py index cff3724..feb80bb 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -30,6 +30,7 @@ make_dlq_table, make_outbox_table, ) +from faststream_outbox.annotations import OutboxMessage as AnnotatedOutboxMessage from faststream_outbox.client import OutboxClient, _validate_schema_sync from faststream_outbox.configs import OutboxBrokerConfig from faststream_outbox.envelope import _encode_payload @@ -40,7 +41,12 @@ from faststream_outbox.publisher.producer import OutboxProducer from faststream_outbox.publisher.specification import OutboxPublisherSpecification from faststream_outbox.response import OutboxPublishCommand -from faststream_outbox.subscriber.usecase import OutboxSubscriber, _compute_backoff +from faststream_outbox.subscriber.usecase import ( + _LAST_EXCEPTION_MAX_CHARS, + _TRUNCATION_SUFFIX, + OutboxSubscriber, + _compute_backoff, +) from faststream_outbox.testing import FakeOutboxClient, FakeOutboxProducer @@ -2656,3 +2662,108 @@ async def test_flush_terminal_dlq_payload_last_exception_none_when_no_exc() -> N payload = spy.await_args.kwargs["dlq_payload"] assert payload["failure_reason"] == "rejected" assert payload["last_exception"] is None + + +# --- last_exception truncation in DLQ payload -------------------------------- + + +async def test_flush_terminal_dlq_payload_truncates_long_exception_repr() -> None: + """A pathological exception with a multi-MB ``repr`` is truncated before write.""" + broker, test_broker = _make_broker_with_dlq() + fake = FakeOutboxClient() + test_broker.fake_client = fake + msg = _make_msg(id=14, queue="orders") + msg.terminal_failure_reason = "retry_terminal" + # Build an exception whose ``repr`` is much larger than the cap. + huge_payload = "x" * (_LAST_EXCEPTION_MAX_CHARS * 3) + msg.last_exception = RuntimeError(huge_payload) + + with patch.object(fake, "delete_with_lease", new=AsyncMock(return_value=True)) as spy: + async with test_broker: + sub = next(iter(broker._subscribers)) # noqa: SLF001 + await sub._flush_terminal(msg, writer_conn=None) # noqa: SLF001 + + assert spy.await_args is not None + rendered = spy.await_args.kwargs["dlq_payload"]["last_exception"] + assert rendered is not None + assert len(rendered) == _LAST_EXCEPTION_MAX_CHARS + assert rendered.endswith(_TRUNCATION_SUFFIX) + + +async def test_flush_terminal_dlq_payload_short_exception_not_truncated() -> None: + """A normal-sized exception ``repr`` passes through untouched.""" + broker, test_broker = _make_broker_with_dlq() + fake = FakeOutboxClient() + test_broker.fake_client = fake + msg = _make_msg(id=15, queue="orders") + msg.terminal_failure_reason = "retry_terminal" + msg.last_exception = ValueError("short message") + + with patch.object(fake, "delete_with_lease", new=AsyncMock(return_value=True)) as spy: + async with test_broker: + sub = next(iter(broker._subscribers)) # noqa: SLF001 + await sub._flush_terminal(msg, writer_conn=None) # noqa: SLF001 + + assert spy.await_args is not None + assert spy.await_args.kwargs["dlq_payload"]["last_exception"] == repr(ValueError("short message")) + + +# --- nacked_terminal metric reason on manual reject() / REJECT_ON_ERROR ----- + + +async def test_metrics_manual_reject_without_exception_emits_nacked_terminal_rejected() -> None: + """ + Manual ``msg.reject()`` (no exception raised) emits ``nacked_terminal(reason="rejected")``. + + Previously routed to ``acked`` because ``last_exception is None`` was checked first. + """ + events, recorder = _events_recorder() + metadata = MetaData() + table = make_outbox_table(metadata) + broker = OutboxBroker(outbox_table=table, metrics_recorder=recorder) + + @broker.subscriber("orders", ack_policy=AckPolicy.MANUAL) + async def handle(body: dict, msg: AnnotatedOutboxMessage) -> None: + del body + await msg.reject() + + session = _make_session_mock() + async with TestOutboxBroker(broker): + await broker.publish({"x": 1}, queue="orders", session=session) + + terminals = [t for e, t in events if e == "nacked_terminal"] + assert len(terminals) == 1 + assert terminals[0]["reason"] == "rejected" + assert "exception_type" not in terminals[0] + assert not any(e == "acked" for e, _ in events) + + +async def test_metrics_reject_on_error_terminal_emits_reason_rejected() -> None: + """ + REJECT_ON_ERROR + handler raise emits ``reason="rejected"`` (was ``"retry_terminal"``). + + The metric branch previously hardcoded ``"retry_terminal"`` for the post-handler + terminal path; it now reads ``terminal_failure_reason`` and includes ``exception_type``. + """ + events, recorder = _events_recorder() + metadata = MetaData() + table = make_outbox_table(metadata) + broker = OutboxBroker(outbox_table=table, metrics_recorder=recorder) + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + + @broker.subscriber("orders", ack_policy=AckPolicy.REJECT_ON_ERROR) + async def handle(body: dict) -> None: + del body + err = "explode" + raise RuntimeError(err) + + session = _make_session_mock() + async with TestOutboxBroker(broker): + await broker.publish({"x": 1}, queue="orders", session=session) + + terminals = [t for e, t in events if e == "nacked_terminal"] + assert len(terminals) == 1 + assert terminals[0]["reason"] == "rejected" + assert terminals[0]["exception_type"] == "RuntimeError" From 6ad92ef41b4d70ce852f1a86f3b1de4205394649 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Sat, 30 May 2026 11:01:19 +0300 Subject: [PATCH 3/3] docs: document opt-in DLQ feature - New docs/usage/dlq.md page (~150 lines) covering quickstart, the three failure_reason values, schema reference, atomic CTE guarantees, last_exception truncation, validate_schema integration, the dlq_written metric (Prometheus + OTel), retention guidance, and a test-broker example. - Wire it into mkdocs.yml nav and docs/index.md between Subscriber and Publisher (sequel to subscriber-side failure handling). - README: one-line DLQ pointer in "How it works"; fix Acknowledgements paragraph that incorrectly claimed "no archive table". - CLAUDE.md: cross-link to the new usage page from the existing Opt-in DLQ architecture section. Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 18 +++- README.md | 4 +- docs/index.md | 1 + docs/usage/dlq.md | 208 ++++++++++++++++++++++++++++++++++++++++++++++ mkdocs.yml | 1 + 5 files changed, 228 insertions(+), 4 deletions(-) create mode 100644 docs/usage/dlq.md diff --git a/CLAUDE.md b/CLAUDE.md index 186599d..86062c6 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -46,7 +46,21 @@ Latency floor: timer firing latency is bounded by `max_fetch_interval` (default ### User-owned schema -`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job — but it **does** declare three indexes on the table itself so Alembic autogenerate brings them up: the partial `(queue, next_attempt_at) WHERE acquired_token IS NULL` backs the fetch CTE's Branch A (unleased rows); the partial `(queue, acquired_at) WHERE acquired_token IS NOT NULL` backs Branch B (expired-lease reclaim); and the partial unique `(queue, timer_id) WHERE timer_id IS NOT NULL` enforces `timer_id` dedup. The fetch CTE's OR is written so each disjunct explicitly carries the matching partial-index predicate as a conjunct — Postgres only uses a partial index when the query implies its WHERE clause, so the naive `acquired_at < cutoff` form would not engage `_lease_idx` and would fall back to seq-scan. Both fetch-side indexes pay write amplification on every claim (the fetch UPDATE writes `acquired_token` and `acquired_at`), traded for bounded scan cost under sustained lease expiry. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` (no archive, no DLQ). +`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` attached to the user's `MetaData`. The package never creates or migrates the table — that's Alembic's job — but it **does** declare three indexes on the table itself so Alembic autogenerate brings them up: the partial `(queue, next_attempt_at) WHERE acquired_token IS NULL` backs the fetch CTE's Branch A (unleased rows); the partial `(queue, acquired_at) WHERE acquired_token IS NOT NULL` backs Branch B (expired-lease reclaim); and the partial unique `(queue, timer_id) WHERE timer_id IS NOT NULL` enforces `timer_id` dedup. The fetch CTE's OR is written so each disjunct explicitly carries the matching partial-index predicate as a conjunct — Postgres only uses a partial index when the query implies its WHERE clause, so the naive `acquired_at < cutoff` form would not engage `_lease_idx` and would fall back to seq-scan. Both fetch-side indexes pay write amplification on every claim (the fetch UPDATE writes `acquired_token` and `acquired_at`), traded for bounded scan cost under sustained lease expiry. `validate_schema()` is **opt-in** (call from `/health` or a startup hook, not `broker.start()`) so migrations can run against the same DB without a startup loop. There is **no** `state` column: a row is "available" iff its lease is unset (`acquired_token IS NULL`) or expired (`acquired_at < now() - lease_ttl_seconds`). Terminal failures `DELETE` by default; opt in to audit via `OutboxBroker(..., dlq_table=make_dlq_table(metadata))` — see "Opt-in DLQ" below. + +### Opt-in DLQ on terminal failure + +`make_dlq_table(metadata, table_name="outbox_dlq")` returns a sibling audit table; pass it to `OutboxBroker(..., dlq_table=...)` to archive terminal failures. Default broker behavior is unchanged when `dlq_table` is None — every existing code path is bit-for-bit identical. + +Atomicity is via a single Postgres CTE: `OutboxClient.delete_with_lease` switches to `WITH deleted AS (DELETE … RETURNING …) INSERT INTO SELECT … FROM deleted` when configured. One statement preserves the writer-connection autocommit fast path and the lease-token guard; INSERT failure rolls back the DELETE, so the outbox row stays leased and is reclaimed when the lease expires — DLQ misconfiguration surfaces as outbox-table growth + `lease_lost` spikes rather than silent audit loss. Identifiers are quoted via the dialect's `identifier_preparer`; values flow through bind params. + +`OutboxInnerMessage.terminal_failure_reason` is set on the three failure paths (`allow_delivery` False → `"max_deliveries"`, `_nack` strategy-exhausted → `"retry_terminal"`, `_reject` → `"rejected"`). `_flush_terminal` reads it to decide whether to build a DLQ payload; `dispatch_one` also reads it to pick the `nacked_terminal(reason=…)` tag value — branch on `terminal_failure_reason` BEFORE `last_exception`, so manual `await msg.reject()` (no exception raised) routes correctly to `nacked_terminal(reason="rejected")` instead of the previously-incorrect `acked`. Success (`_ack`) leaves the field None; success rows never touch the DLQ. **The `DLQFailureReason` `Literal` type (in `message.py`) is the public contract** for this string — operator queries and dashboard labels key off these values, so changing them is API-breaking. + +`last_exception` is serialized via `repr()` and bounded by `_LAST_EXCEPTION_MAX_CHARS=8192` in `subscriber/usecase.py`. Some exceptions carry MB-scale payloads (validation errors with the full request body, `asyncpg.DataError` with the rejected row); an unbounded `repr` would extend the writer round-trip on a poison row and bloat the DLQ. Truncation appends `…[truncated]`. The DLQ `failure_reason` column is `String(64)` (current literals fit in 14 bytes; the breathing room lets the canonical set grow without a column-widening migration). + +There is no built-in retention/pruning. Operators are responsible for archival — suggested pattern: partition the DLQ by `failed_at` and drop old partitions via a cron job. + +User-facing reference: `docs/usage/dlq.md`. `validate_schema()` delegates to `alembic.autogenerate.compare_metadata` against a throwaway `MetaData` populated by `make_outbox_table(...)` — so the canonical `Table` is the single source of truth and the validator never duplicates the schema declaration. It only flags **missing** schema (`add_*` / `modify_*` ops); `remove_*` ops are intentionally ignored so users may attach extras (audit columns, their own indexes). Alembic is an **optional dependency** (`faststream-outbox[validate]`); without it, `validate_schema()` raises `ImportError`, but every other code path works (the import lives at the top of `client.py` inside a try/except, with module-level sentinels `_alembic_compare_metadata` / `_AlembicMigrationContext` set to `None` on failure). @@ -131,7 +145,7 @@ The caller owns the `AsyncEngine`. `OutboxBrokerConfig.disconnect()` deliberatel ### Metrics seam (`metrics/__init__.py`) -`OutboxBroker(..., metrics_recorder=...)` accepts a `MetricsRecorder = Callable[[str, Mapping[str, Any]], None]`. The default (`_noop_recorder`) lets instrumentation sites call unconditionally. The recorder threads through `OutboxBrokerConfig.metrics_recorder` to two places: the subscriber's six emission points (`fetched`, `dispatched`, `acked`, `nacked_retried`, `nacked_terminal`, `lease_lost`) via `OutboxSubscriber._emit_metric`, and the producer's single emission point (`published`) via `OutboxProducer._emit_metric`. The producer reads the recorder from its own constructor kwarg (passed in alongside the config field) so the canonical insert path doesn't have to reach through the broker config at call time. +`OutboxBroker(..., metrics_recorder=...)` accepts a `MetricsRecorder = Callable[[str, Mapping[str, Any]], None]`. The default (`_noop_recorder`) lets instrumentation sites call unconditionally. The recorder threads through `OutboxBrokerConfig.metrics_recorder` to two places: the subscriber's emission points (`fetched`, `dispatched`, `acked`, `nacked_retried`, `nacked_terminal`, `lease_lost`, plus `dlq_written` when `dlq_table` is configured) via `OutboxSubscriber._emit_metric`, and the producer's single emission point (`published`) via `OutboxProducer._emit_metric`. The producer reads the recorder from its own constructor kwarg (passed in alongside the config field) so the canonical insert path doesn't have to reach through the broker config at call time. `dlq_written` and `nacked_terminal` are complementary — alert on a divergence between the two rates to catch DLQ misconfiguration without silent audit loss. Every call site wraps the recorder in `try/except` and logs at DEBUG — a broken recorder never poisons the dispatch loop. The recorder is called from the event loop and **must not block**; sync `Counter.inc()` is fine, blocking HTTP/StatsD calls are not. The library does not wrap user recorders in `asyncio.to_thread` — that would destroy ordering and create per-event task explosion. diff --git a/README.md b/README.md index aa8be0a..a1792ef 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ async with session_factory() as session, session.begin(): ## How it works -A subscriber owns two async loops: a **fetch** loop claims available rows via a single CTE (`SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`), and `max_workers` **worker** loops dispatch to the handler. On success, `DELETE WHERE id=:id AND acquired_token=:token`; on failure, the retry strategy schedules another attempt or terminally drops the row. +A subscriber owns two async loops: a **fetch** loop claims available rows via a single CTE (`SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *`), and `max_workers` **worker** loops dispatch to the handler. On success, `DELETE WHERE id=:id AND acquired_token=:token`; on failure, the retry strategy schedules another attempt or terminally drops the row. Terminal failures `DELETE` by default; pass `dlq_table=make_dlq_table(metadata)` to atomically archive them into a sibling audit table instead — see [Dead-letter queue](https://faststream-outbox.readthedocs.io/en/latest/usage/dlq/). The `acquired_token` is the load-bearing invariant: a slow handler whose lease expired and was re-claimed by another worker finds its terminal `DELETE` to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder. @@ -52,7 +52,7 @@ See [How it works](https://faststream-outbox.readthedocs.io/en/latest/introducti ## Acknowledgements -The architecture of this package is heavily informed by Arseniy Popov's [PR #2704](https://github.com/ag2ai/faststream/pull/2704) (`feat: add sqla broker`) on upstream FastStream — the FastStream broker/registrator/subscriber wiring, the `SELECT … FOR UPDATE SKIP LOCKED` fetch-and-claim CTE, the retry strategy hierarchy, and the in-transaction publish contract all originate from there. This package is a Postgres-only reimplementation that diverges in storage model (lease tokens instead of an explicit state column, no archive table), loop structure (two loops instead of four), wake-up mechanism (`LISTEN/NOTIFY`), and adds timer mechanics. Credit for the original design belongs to Arseniy. +The architecture of this package is heavily informed by Arseniy Popov's [PR #2704](https://github.com/ag2ai/faststream/pull/2704) (`feat: add sqla broker`) on upstream FastStream — the FastStream broker/registrator/subscriber wiring, the `SELECT … FOR UPDATE SKIP LOCKED` fetch-and-claim CTE, the retry strategy hierarchy, and the in-transaction publish contract all originate from there. This package is a Postgres-only reimplementation that diverges in storage model (lease tokens instead of an explicit state column, archive table is opt-in), loop structure (two loops instead of four), wake-up mechanism (`LISTEN/NOTIFY`), and adds timer mechanics. Credit for the original design belongs to Arseniy. ## Part of `modern-python` diff --git a/docs/index.md b/docs/index.md index 0d77c63..72159bf 100644 --- a/docs/index.md +++ b/docs/index.md @@ -12,6 +12,7 @@ A producer writes a domain entity and an outbox row in the *same* SQLAlchemy tra - [How it works](introduction/how-it-works.md) - [Basic usage](usage/basic.md) - [Subscriber](usage/subscriber.md) +- [Dead-letter queue](usage/dlq.md) - [Publisher](usage/publisher.md) - [Router](usage/router.md) - [FastAPI integration](usage/fastapi.md) diff --git a/docs/usage/dlq.md b/docs/usage/dlq.md new file mode 100644 index 0000000..d3d65fe --- /dev/null +++ b/docs/usage/dlq.md @@ -0,0 +1,208 @@ +# Dead-letter queue + +Opt-in audit for terminal failures. Pass `dlq_table=make_dlq_table(metadata)` +to the broker and every row that fails terminally is copied into the DLQ in +the same Postgres statement as the outbox `DELETE`. Default behavior is +unchanged when `dlq_table` is omitted — no audit table, no new code paths. + +## Quickstart + +Build the DLQ `Table` on the same `MetaData` as your outbox table so +Alembic discovers both, then wire it into the broker: + +```python +from sqlalchemy import MetaData +from sqlalchemy.ext.asyncio import create_async_engine + +from faststream_outbox import OutboxBroker, make_dlq_table, make_outbox_table + + +metadata = MetaData() +outbox_table = make_outbox_table(metadata, table_name="outbox") +dlq_table = make_dlq_table(metadata, table_name="outbox_dlq") + +engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox") +broker = OutboxBroker(engine, outbox_table=outbox_table, dlq_table=dlq_table) +``` + +The package does not create or migrate the table — run `metadata.create_all` +(or your Alembic migration) once both tables are declared. Subscribers and +publishers need no further configuration; the broker reads `dlq_table` from +its own config when it builds the terminal-flush SQL. + +## What gets archived + +A row lands in the DLQ when it is **terminal-by-failure**, i.e. the +subscriber's terminal flush would otherwise `DELETE` the row because of a +failure (not a clean ack). Three paths produce that: + +| `failure_reason` | Trigger | +|---|---| +| `max_deliveries` | `deliveries_count > max_deliveries` — handler is never invoked for this attempt. | +| `retry_terminal` | Handler raised; the retry strategy returned `None` (attempts / total-delay exhausted, or `NoRetry()`). | +| `rejected` | Handler called `await msg.reject()` directly, or `AckPolicy.REJECT_ON_ERROR` rejected the row on an exception. | + +Successful rows (`ack`) are never archived; the success path stays a plain +`DELETE` and never touches the DLQ. See +[Ack policy](./subscriber.md#ack-policy) and +[Retry strategies](./subscriber.md#retry-strategies) for the upstream +behaviors that decide which reason fires. + +## Schema reference + +`make_dlq_table(metadata, table_name="outbox_dlq")` declares: + +| Column | Type | Notes | +|---|---|---| +| `id` | `BigInteger`, PK, autoincrement | DLQ row identity. | +| `original_id` | `BigInteger`, not null | The outbox row's id, for operator forensics. Not unique — a re-delivered `timer_id` row could legitimately land here twice. | +| `queue` | `String(255)`, not null | Source queue name. | +| `payload` | `LargeBinary`, not null | Verbatim copy of the outbox payload bytes. | +| `headers` | `JSONB`, nullable | Verbatim copy, including the inherited `correlation_id`. | +| `deliveries_count` | `BigInteger`, not null | Attempt count at the moment of failure. | +| `created_at` | `DateTime(timezone=True)`, not null | The outbox row's original `created_at` — measures time-to-terminal-failure. | +| `failed_at` | `DateTime(timezone=True)`, not null, default `now()` | When the audit row was written. | +| `failure_reason` | `String(64)`, not null | One of the three values in the table above. | +| `last_exception` | `String`, nullable | `repr()` of the raised exception, bounded at 8 KiB (see below). `None` on manual `reject()` without an exception. | + +Index: `(queue, failed_at)` (btree, non-unique) — supports "show me recent +failures for queue X" queries without a sequential scan as the DLQ grows. + +No foreign key references the outbox table: the source row is gone in the +same transaction, so the constraint would be unsatisfiable. There is also no +`LISTEN/NOTIFY` channel — nobody polls the DLQ. + +## Atomicity + +When `dlq_table` is configured, `OutboxClient.delete_with_lease` switches to +a single CTE statement: + +```sql +WITH deleted AS ( + DELETE FROM WHERE id = :id AND acquired_token = :token + RETURNING id, queue, payload, headers, deliveries_count, created_at +) +INSERT INTO (original_id, queue, payload, headers, deliveries_count, + created_at, failure_reason, last_exception) +SELECT id, queue, payload, headers, deliveries_count, created_at, + :failure_reason, :last_exception +FROM deleted; +``` + +Two operator-visible properties fall out of this shape: + +- **Lease-lost is a transparent no-op.** If another worker reclaimed the + row after a lease expiry, `WHERE acquired_token = :token` matches + nothing, `deleted` is empty, the INSERT inserts zero rows, and the + caller sees `rowcount == 0` — same observable as the no-DLQ path. The + lease-token guard documented in [Subscriber](./subscriber.md) is + preserved. +- **DLQ-write failure rolls back the DELETE.** If the INSERT fails + (column mismatch, disk full, ENUM violation), the whole statement + rolls back. The outbox row stays leased and is reclaimed when the + lease expires. Misconfiguration surfaces as outbox growth plus + `lease_lost` spikes rather than silent audit loss. + +The statement runs on the worker's autocommit writer connection — one +round-trip per terminal flush, same cost as the no-DLQ path. + +## `last_exception` truncation + +The serialized exception (`repr(exc)`) is bounded at 8 KiB by +`_LAST_EXCEPTION_MAX_CHARS` in `faststream_outbox/subscriber/usecase.py`. +Anything longer is truncated and `…[truncated]` appended. + +Rationale: some exceptions carry MB-scale payloads — pydantic validation +errors with the rejected request body, asyncpg `DataError` with the full +row, etc. An unbounded `repr` would extend the writer round-trip on a +poison row by hundreds of milliseconds and bloat the DLQ table. 8 KiB +preserves the traceback and any structured detail while bounding worst +case. + +## Schema validation + +When `dlq_table` is set, `await broker.validate_schema()` checks both +tables and surfaces missing columns / indexes on either one. The DLQ +table is validated independently — drift in one table does not mask drift +in the other. See [Schema validation](./schema-validation.md) for the +opt-in install + `/health` pattern. + +## Metric: `dlq_written` + +`_flush_terminal` emits a `dlq_written` recorder event after the CTE +commits successfully. Skipped on the lease-lost path (no audit row was +written, so nothing to count). + +Tags: + +| Tag | Notes | +|---|---| +| `queue` | Source queue. | +| `subscriber` | Subscriber handler name (`call_name`). | +| `deliveries_count` | Attempt count at terminal flush. | +| `failure_reason` | Same value set as the schema column. | +| `exception_type` | Present only when `last_exception` was set (omitted for `max_deliveries` and manual `reject()` without an exception). | + +The bundled adapters surface the event without further wiring: + +- **Prometheus**: counter `faststream_outbox_dlq_written_total{reason}`. +- **OpenTelemetry**: counter `messaging.outbox.dlq_written` with the + `messaging.outbox.dlq_reason` attribute and the standard + `error.type` attribute when present. + +Pair with `nacked_terminal` to alert on DLQ misconfiguration: every +terminal-failure row should produce one `nacked_terminal` *and* one +`dlq_written`. A persistent divergence (terminal rate > DLQ rate) means +either the CTE keeps rolling back (DLQ schema drift) or the lease keeps +expiring before flush (`lease_ttl_seconds` too low for handler P99) — +both are operator-actionable signals. See +[Observability](./observability.md) for the broader recorder + middleware +story. + +## Retention + +There is no built-in pruning. Operators are responsible for archival or +expiry. + +Recommended pattern: partition the DLQ by `failed_at` (monthly or +weekly) and drop old partitions via a cron job. The `(queue, failed_at)` +index already supports partition pruning in operator queries; convert it +to a partitioned table at create time if you expect a steady DLQ +inflow. + +For low-volume DLQs a plain `DELETE FROM WHERE failed_at < now() - +interval '90 days'` from a daily cron is enough. + +## Test broker + +`TestOutboxBroker` accumulates audit rows in +`broker.fake_client.dlq_rows` so tests can assert on archive content +without a real Postgres. The fake mirrors the production CTE +side-effect: the source row is removed from `fake_client.rows` and an +audit dict is appended to `fake_client.dlq_rows` in the same call. + +```python +from faststream_outbox import NoRetry, OutboxBroker, TestOutboxBroker, make_dlq_table, make_outbox_table + + +metadata = MetaData() +outbox_table = make_outbox_table(metadata, table_name="outbox") +dlq_table = make_dlq_table(metadata, table_name="outbox_dlq") +broker = OutboxBroker(outbox_table=outbox_table, dlq_table=dlq_table) + + +@broker.subscriber("orders", retry_strategy=NoRetry()) +async def handle(body: dict) -> None: + raise RuntimeError("boom") + + +test_broker = TestOutboxBroker(broker) +async with test_broker: + await broker.publish({"order_id": 1}, queue="orders") + +assert test_broker.fake_client.rows == [] +assert len(test_broker.fake_client.dlq_rows) == 1 +assert test_broker.fake_client.dlq_rows[0]["failure_reason"] == "retry_terminal" +``` + +See [Testing](./testing.md) for the broader test-broker contract. diff --git a/mkdocs.yml b/mkdocs.yml index e4b7425..9786c17 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -9,6 +9,7 @@ nav: - Usage: - Basic usage: usage/basic.md - Subscriber: usage/subscriber.md + - Dead-letter queue: usage/dlq.md - Publisher: usage/publisher.md - Router: usage/router.md - FastAPI integration: usage/fastapi.md