Event Queues#2608
Conversation
| * **Inbox** → for asynchronously handling inbound requests | ||
| * **Background tasks** → e.g., scheduled periodically | ||
| * **Remote Callbacks** → implementing SAGA patterns | ||
| @Before(entity = DeadOutboxMessages_.CDS_NAME) |
There was a problem hiding this comment.
Should be:
@before(event = CqnService.EVENT_READ, entity = DeadOutboxMessages_.CDS_NAME)
The handler takes CdsReadEventContext and modifies a CqnSelect query, so it's clearly intended for READ operations. Without event, the annotation matches all events, which is misleading.
| Enable the inbox in your configuration: | ||
|
|
||
| ::: code-group | ||
| ```json [Node.js — package.json] |
There was a problem hiding this comment.
Consider adding a brief note highlighting differences between inboxed and inbox.enabled
| const xflights = cds.unqueued(qd_xflights) | ||
| ``` | ||
| ```java [Java] | ||
| CqnService xflights = outbox.unboxed(outboxedXFlights); |
There was a problem hiding this comment.
The unboxed() method appears to be a static method on OutboxService:
CqnService xflights = OutboxService.unboxed(outboxedXFlights);
| target : String; // Target service/queue name | ||
| msg : LargeString; // Serialized event payload | ||
| attempts : Integer default 0; // Number of processing attempts | ||
| partition : Integer default 0; |
There was a problem hiding this comment.
add comment like "// reserved"
| ::: details When is a message picked up next? | ||
| A pending message is *processable* when all three conditions hold: | ||
|
|
||
| 1. Its scheduled timestamp plus the retry backoff (`attempts × <exponential factor>`) is in the past. |
There was a problem hiding this comment.
The notation attempts × is potentially misleading. The number of attemps is already in - no need to multiply. If multiplying, then it looks like linear function.
exponential_factor(attempts)
or simply
exponential backoff based on attempts
| const xflights = await cds.connect.to('xflights') | ||
|
|
||
| // Called when the queued booking succeeds | ||
| xflights.after('bookFlight/#succeeded', async (result, req) => { |
There was a problem hiding this comment.
it's not explained what result contains:
The return value of the event handler OR
The response object
| lastAttemptTimestamp : Timestamp; // When last attempt occurred | ||
| status : String(23); // Current processing status | ||
| task : String(255); // Task name for named/singleton tasks | ||
| appid : String(255); // Application ID for shared HDI containers |
There was a problem hiding this comment.
The comment is quite terse. Consider expanding it to clarify the purpose or add a sentence after the data model explaining like "appid enables multiple applications to share the same outbox table without task name collisions"
| }) | ||
| ``` | ||
| ```java [Java] | ||
| @On |
| context.setCompleted(); | ||
| } | ||
|
|
||
| @On |
|
|
||
| | Option | Default | Description | | ||
| |--------|---------|-------------| | ||
| | `maxAttempts` | `20` | Maximum retries before a message becomes a dead letter | |
There was a problem hiding this comment.
maxAttempts is different for node and java, add to "Stack Differences at a Glance" table
|
|
||
| ::: details Configuration options for Java | ||
|
|
||
| | Option | Default | Description | |
There was a problem hiding this comment.
Java example shows storeLastError: true, but storeLastError is not listed in the Java configuration options
| | `maxAttempts` | `20` | Maximum retries before a message becomes a dead letter | | ||
| | `chunkSize` | `10` | Number of messages to process per batch | | ||
| | `storeLastError` | `true` | Store error information of the last failed attempt | | ||
| | `timeout` | `"1h"` | Time after which a `processing` message is considered abandoned | |
There was a problem hiding this comment.
abandoned and eligible for reprocessing ?
|
|
||
| ### Inbox | ||
|
|
||
| The inbox mirrors the *'Outbox'* pattern for inbound messages. |
There was a problem hiding this comment.
Above (line 115): This pattern is widely known as the 'Transactional Outbox',
| Event queues are not limited to messaging. | ||
| You can schedule arbitrary background tasks such as data replication, cache refresh, or garbage collection. | ||
|
|
||
| **Example:** Replicate airport master data from the xflights service every 10 minutes. |
|
|
||
| This stores the flight booking request in the database together with the travel creation. CAP dispatches it later in the background. If the transaction rolls back, no booking request is sent. | ||
|
|
||
| The `xflights` connection in the example stands in for any remote service you've configured under `cds.requires`. The complete setup, including the *XTravels* application and the *xflights* service it consumes, lives in the [@capire/xtravels](https://github.com/capire/xtravels) sample. |
| ### Error Handling | ||
|
|
||
| When processing fails, the system retries the message with exponentially increasing delays. | ||
| After a configurable maximum number of attempts, the message is moved to the dead letter queue. |
There was a problem hiding this comment.
not moved, just considered as dead message
No description provided.