feat: Kafka deliverBatch fire-flush-await — 13-41x throughput (KOJAK-73)#40
Open
endrju19 wants to merge 8 commits into
Open
feat: Kafka deliverBatch fire-flush-await — 13-41x throughput (KOJAK-73)#40endrju19 wants to merge 8 commits into
endrju19 wants to merge 8 commits into
Conversation
Overrides KafkaMessageDeliverer.deliverBatch with the fire-flush-await pattern.
Replaces N sequential producer.send().get() round-trips per batch with one
batched network round-trip:
Phase 1 (fire) producer.send() per entry — non-blocking, records queue
in producer's internal buffer
Phase 2 (flush) one producer.flush() — bypasses linger.ms, drains all
in-flight records to broker in one batched RTT
Phase 3 (await) Future.get() per entry — non-blocking after flush, just
reads settled completion
Per-entry classification preserved via SendOutcome sealed type — synchronous
exceptions in send() (BufferExhaustedException, SerializationException) are
caught individually so one failing send never aborts the batch. flush()
InterruptException restores the interrupt flag and lets remaining futures
surface their own outcome.
deliver() refactored to share buildRecord/classifyException helpers with
deliverBatch — no behavior change for single-entry path.
Benchmark results vs KOJAK-68 baseline (smoke run, MacBook M3 Max,
Postgres 16 + Kafka 3.8.1 in Testcontainers):
batchSize baseline KOJAK-73 improvement
10 ~109 msg/s ~1,468 msg/s 13.5x
50 ~115 msg/s ~3,731 msg/s 32.3x
100 ~115 msg/s ~4,717 msg/s 41.0x
Throughput now SCALES with batchSize (was flat ~115 msg/s pre-KOJAK-73),
proving Kafka's internal record batching is being exploited. Sublinear
scaling 50 -> 100 indicates DB UPDATE per entry is becoming the next
bottleneck — exactly what KOJAK-75 (batch UPDATE via executeBatch)
addresses next.
Tests:
- KafkaMessageDelivererBatchTest covers empty input, all-success ordering,
flush-counted-once verification, synchronous Permanent + Retriable send
exceptions, and future-based async exception (driven via MockProducer
override that completeNext/errorNext per position inside flush)
- Existing KafkaMessageDelivererTest unchanged — single-entry path still
works identically
- Integration tests (real Kafka via Testcontainers) pass
Documents results in benchmarks/results-postopt-kojak-73.md and bumps
README headline numbers to reflect new Kafka throughput.
…s (KOJAK-73) Addresses cross-cutting findings from PR #40 review (5 agents converged on the interrupt/flush error handling). Critical fix — flush() / interrupt path: - Broaden flush() exception handling: previously only InterruptException was caught, so a fatal KafkaException or IllegalStateException (e.g. closed producer) would propagate out of deliverBatch and abandon all in-flight futures uncollected, contradicting the documented per-entry classification contract. Now any exception from flush() is logged and swallowed; awaitOne classifies each entry from its own future state. - Add explicit InterruptedException handling in awaitOne and deliver. Without this, an interrupt during flush() would translate into PermanentFailure on pending futures (since java.lang.InterruptedException is NOT a Kafka RetriableException), incorrectly marking transient interrupts as terminal. Now classified as RetriableFailure with the interrupt flag re-armed. Important — observability: - Add SLF4J logger (implementation dep on slf4j-api added to okapi-kafka, consistent with okapi-core's pattern). Logs warn on flush failure (with batch size) and debug on synchronous send rejection (with outboxId). - classifyException fallback for null exception messages now uses e.javaClass.simpleName instead of generic "Permanent/Retriable Kafka error", preserving debuggability when an exception's message is null. Type design polish: - SendOutcome.Sent is now @JvmInline value class instead of data class — Future has reference-only equals which a data class would falsely advertise. Zero runtime cost, removes the misleading promise. Comments: - Condensed class-level KDoc, removed unverified "since Kafka 3.0" claim about exception hierarchy. - Reworded deliverBatch KDoc to drop numbered-step restatement and clarify that flush() failures never abandon the batch. - Trimmed redundant test inline comments per "no redundant comments" rule. New tests (cover gaps the reviews flagged): - mixed sync-throw + async outcomes in one batch with positional integrity - poison-pill malformed deliveryMetadata in fire phase - flush() throws non-Interrupt exception → per-entry classification preserved - flush() throws InterruptException → pending futures classified as Retriable Verification: - All unit tests pass (4 new + existing batch + single-entry tests) - All integration tests pass (real Postgres + Kafka via Testcontainers) - ktlint clean - Smoke benchmark Kafka batchSize=50: 0.260 ms/op vs 0.268 pre-review (within noise; logger overhead is on failure paths only, happy path unaffected). 32x improvement vs KOJAK-68 baseline preserved.
…xception classification, file cleanup (KOJAK-73) - Tighten MessageDeliverer.deliverBatch KDoc: normative contract for never-throw + per-entry classification + same-size guarantee - Add real-Kafka integration tests for deliverBatch (25-entry batch ordering + empty-batch short-circuit) - Classify bare ExecutionException (null cause) as Retriable rather than Permanent (extract classifyExecutionException helper) - Fix logger.debug stack-trace loss in fireOne (pass exception, not toString()) - Drop @JvmInline from SendOutcome.Sent — inline opt is defeated when used inside sealed-interface when/is branches - Add Kafka InterruptException tests covering deliver() and deliverBatch sync send paths - Rename benchmark files from kojak-73-* to descriptive technical names; strip KOJAK refs from README
This was referenced May 14, 2026
endrju19
added a commit
that referenced
this pull request
May 14, 2026
…e failures (#44) ## Summary `HttpMessageDeliverer.deliver()` previously caught all exceptions as `RetriableFailure`. This meant **corrupt delivery metadata or an unknown service triggered an infinite retry loop** instead of moving the entry to `FAILED`. This PR classifies exceptions explicitly: | Exception | Classification | Why | |---|---|---| | `JsonProcessingException` | `PermanentFailure` | Corrupt metadata won't fix itself (caught before `IOException` — it's a subtype) | | `IOException` | `RetriableFailure` | Connection / timeout — transient | | `InterruptedException` | `RetriableFailure` | Interrupt flag restored; consistent with `KafkaMessageDeliverer` | | other `Exception` | `PermanentFailure` | Malformed URI, unknown service, `IllegalStateException` from `ServiceUrlResolver`, etc. | ## Why this matters Before: `okapi-http` consumer with malformed JSON in `deliveryMetadata` → infinite retries → retry storm in logs, entry never marked `FAILED`, operator alert fatigue. After: corrupt input → `PermanentFailure` → entry moves to `FAILED` per `RetryPolicy`, ops visibility preserved. ## Scope - `HttpMessageDeliverer.kt`: refactor to expression-body, granular `catch` blocks, updated KDoc - `HttpMessageDelivererTest.kt`: 2 new tests - corrupt metadata → `PermanentFailure` (does not throw) - `urlResolver` throwing → `PermanentFailure` (does not throw) Independent of the Kafka `deliverBatch` work in PR #40. ## Test plan - [x] `./gradlew :okapi-http:test ktlintCheck` — all 13 unit tests pass (11 existing + 2 new)
ramafasa
reviewed
May 14, 2026
| private fun awaitOne(outcome: SendOutcome): DeliveryResult = when (outcome) { | ||
| is SendOutcome.ImmediateFailure -> outcome.result | ||
| is SendOutcome.Sent -> try { | ||
| outcome.future.get() |
Collaborator
Author
There was a problem hiding this comment.
bare get() is safe here. Kafka's Sender already bounds settlement via delivery.timeout.ms (default 120 s), and flush() only returns once every Future is settled. Users tune it via the producer config they inject
ramafasa
approved these changes
May 14, 2026
…s Pair<OutboxEntry, DeliveryResult> (#45) ## Summary Replaces the anonymous tuple `List<Pair<OutboxEntry, DeliveryResult>>` on `MessageDeliverer.deliverBatch` with a named domain type `List<DeliveryOutcome>`. Same shape (two fields), better contract. ## Why The batch-delivery result is a domain concept, not a utility tuple — it deserves a name. Three concrete benefits: | Aspect | `Pair<OutboxEntry, DeliveryResult>` | `DeliveryOutcome` | |---|---|---| | Kotlin access | `pair.first` / `pair.second` | `outcome.entry` / `outcome.result` | | Java access | `pair.getFirst()` / `pair.getSecond()` | `outcome.getEntry()` / `outcome.getResult()` | | Extensibility | Locked at 2 anonymous fields | Optional `attemptNumber`, `latencyMs`, ... addable later | | Domain modeling | Tuple | Named concept consistent with `DeliveryResult` / `DeliveryInfo` / `MessageDeliverer` | ## Why pre-release Currently `0.2.0-SNAPSHOT`, no external consumers. Renaming after release would be a breaking change in a public API. The cost now is ~30 minutes; the cost later is a major version bump. ## Scope - **New type:** `okapi-core/.../DeliveryOutcome.kt` — `data class DeliveryOutcome(val entry: OutboxEntry, val result: DeliveryResult)` - **Interface:** `MessageDeliverer.deliverBatch` returns `List<DeliveryOutcome>` - **`CompositeMessageDeliverer`:** builds `DeliveryOutcome` instances; lookup map via `.associate { it.entry to it.result }` - **`KafkaMessageDeliverer`:** the public override constructs `DeliveryOutcome` on emission; the private `SendOutcome` sealed type is untouched - **Tests:** `.first`/`.second` → `.entry`/`.result`; destructuring `(entry, result) -> ...` patterns unchanged (data-class `componentN` keeps working) - **`OutboxEntryProcessor`:** consumes via destructuring — zero changes ## Base branch Based on `feature/kojak-73-kafka-deliver-batch` (PR #40) — cumulative with that work. Merge order: PR #40 first, then this one. ## Test plan - [x] `./gradlew clean ktlintFormat ktlintCheck build -x :okapi-integration-tests:test` — all modules build, full unit suite passes - [x] `:okapi-integration-tests:compileTestKotlin` — integration test compiles against new signature
2 tasks
…a-deliver-batch # Conflicts: # README.md
2 tasks
endrju19
added a commit
that referenced
this pull request
May 17, 2026
#48) ## Summary Three independent fixes that make \`./gradlew :okapi-benchmarks:jmh\` complete cleanly. Before these, the JMH run OOMs partway through. All three issues exist on main today; running the benchmark suite without these fixes will fail. No test or production code is touched — pure benchmark infrastructure. ## Fixes ### 1. Bump JMH JVM heap to \`-Xmx8g\` Throughput-mode microbenchmarks call \`deliver()\` at ~1M ops/s; each call allocates Jackson + Kotlin reflection state for JSON deserialization. At the previous default \`-Xmx2g\` the allocation rate exceeds GC throughput and OOMs within the first measurement iteration. ### 2. Pass \`-Dliquibase.duplicateFileMode=WARN\` as JMH JVM arg \`okapi-postgres.jar\` and the fat JMH jar both ship the changelog at the same classpath path (\`com/softwaremill/okapi/db/postgres/changelog.xml\`). Liquibase 4.x treats duplicate resources as an error by default, which aborts \`PostgresBenchmarkSupport\` setup. The two files are identical (same jar source on the classpath twice), so \`WARN\` is safe. ### 3. Subclass \`MockProducer\` in \`DelivererMicroBenchmark\` to \`clear()\` history after every \`send()\` \`MockProducer.history\` (internal \`sent\` list) retains every record sent for inspection — there is no eviction. In throughput mode at ~1M ops/s for 30s × forks × iterations that list grew to GBs and OOMed the JVM regardless of heap size. Discarding per call is safe because microbench doesn't inspect what was sent — only timing. With this fix, \`DelivererMicroBenchmark.kafkaDeliver\` now produces meaningful numbers (~2.3M ops/s ± <1%) instead of \`error > score\`. ## Files - \`okapi-benchmarks/build.gradle.kts\` — JVM args - \`okapi-benchmarks/src/jmh/kotlin/.../DelivererMicroBenchmark.kt\` — MockProducer override ## Why a separate PR These are pure infrastructure fixes — completely independent of any specific benchmark or transport implementation. Carved out from PR #46 (KOJAK-82) so they can land on main right away, without waiting for the Kafka deliverBatch (#40) review cycle. PR #46 will then contain only the refreshed JMH numbers. ## Test plan - [x] \`./gradlew :okapi-benchmarks:compileJmhKotlin\` passes - [x] Verified locally: full \`./gradlew :okapi-benchmarks:jmh\` run completes with \`BUILD SUCCESSFUL\` and no OOM
…ush() failure (KOJAK-73) Address PR feedback (ramafasa): if producer.flush() throws a non-Interrupt exception and the in-flight Future is still pending, bare Future.get() in awaitOne() blocks indefinitely. Reproduced with a unit test using MockProducer(autoComplete=false) + flush() override throwing IllegalStateException — without this fix the test hangs. In real KafkaProducer the worst-case wait is bounded by delivery.timeout.ms (default 2 min), but for batchSize=100 that could stall the processor thread for up to ~200 min — unacceptable. The fix bounds each await at 5 s; pending futures past that yield RetriableFailure so the outbox retries. Happy path (flush returned normally): futures are already settled per Kafka contract, get() returns ~0 ms — no measurable overhead. Adds a regression test that runs deliverBatch on a separate thread guarded by an outer 30 s budget; if anyone removes the get(timeout) defense the test fails fast instead of wedging the test JVM.
…a-deliver-batch # Conflicts: # okapi-kafka/build.gradle.kts
…2) (#46) ## What Refreshes `benchmarks/kafka-deliverbatch.json` and `benchmarks/results-kafka-deliverbatch.md` with a full-config JMH run (`fork=2, warmup=3, iter=5`, n=10 samples per benchmark). Also lands three benchmark-infra fixes that were needed to make the run complete cleanly. ## Headline (Kafka throughput, msg/s) | batchSize | Score (ms/op) | msg/s | vs sync-sequential baseline | |-----------|-----------------|--------------|---------------------------| | 10 | 0.559 ± 0.029 | **~1,790** | 16.4× | | 50 | 0.242 ± 0.007 | **~4,132** | 35.8× | | 100 | 0.193 ± 0.004 | **~5,181** | 45.1× | All Kafka error bars <5% of score. Numbers reproduced across two independent runs (delta <3% between them). ## Benchmark infrastructure fixes (also in this PR) Without these, the JMH run cannot complete: 1. **JMH JVM heap** — bumped to `-Xmx8g`. Throughput-mode microbenches were OOMing at the previous `-Xmx2g` because Jackson + Kotlin reflection allocate per call at ~1M ops/s rates. 2. **Liquibase duplicate-changelog workaround** — added `-Dliquibase.duplicateFileMode=WARN` to JVM args. The fat JMH jar and `okapi-postgres.jar` both ship the changelog at the same path; Liquibase 4.x treats this as an error by default. Files are identical so WARN is safe. 3. **`MockProducer.history` cleared after each `send()`** in `DelivererMicroBenchmark`. `MockProducer` retains every record sent for inspection; at ~1M ops/s for 30s × forks × iters that list grew to GBs and OOMed the JVM regardless of heap size. Microbench doesn't need to inspect what was sent — discarding per call is safe. With this fix, `DelivererMicroBenchmark.kafkaDeliver` now produces meaningful numbers (2.3M ± 19k ops/s) instead of `error > score`. ## Files touched - `benchmarks/kafka-deliverbatch.json` — full-config raw results - `benchmarks/results-kafka-deliverbatch.md` — `Score ± Error` tables + microbench section + HTTP companion table - `README.md` — refreshed throughput table - `okapi-benchmarks/build.gradle.kts` — heap bump + Liquibase JVM arg - `okapi-benchmarks/src/jmh/kotlin/.../DelivererMicroBenchmark.kt` — MockProducer override ## Notes - Run on JDK 21 LTS (matches CLAUDE.md target). - HTTP throughput numbers also included for completeness — still sync sequential, KOJAK-74 will address. - `DelivererMicroBenchmark.httpDeliver` benchmarks the WireMock-local HTTP path; numbers are dominated by loopback TCP cost, not library overhead. ## Base branch Based on `feature/kojak-73-kafka-deliver-batch` (PR #40). Merge after #40. ## Test plan - [x] `./gradlew :okapi-benchmarks:jmh` — completes with `BUILD SUCCESSFUL`, all benchmarks produce non-NaN error bars - [x] Reproducibility verified: two independent runs, Kafka throughput delta <3%
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Overrides
KafkaMessageDeliverer.deliverBatchwith the fire-flush-await pattern. Replaces N sequentialproducer.send().get()round-trips with one batched network round-trip viaproducer.flush().KOJAK-73 — Kafka deliverBatch
Builds on KOJAK-72 (deliverBatch interface).
Headline numbers
Smoke benchmark vs KOJAK-68 baseline (same hardware: MacBook M3 Max, JDK 25 LTS, Postgres 16 + Kafka 3.8.1 via Testcontainers,
fork=1, warmup=1, iter=2, warmup=10s, measurement=15s):batchSizeis now load-bearing. Pre-KOJAK-73 throughput was flat across batchSize values — the bottleneck was per-record blockingproducer.send().get(). Post-KOJAK-73 throughput scales with batchSize, proving Kafka's internal record batching is being exploited.Full results + interpretation:
benchmarks/results-postopt-kojak-73.md.Implementation
SendOutcomesealed type distinguishes synchronous send exceptions (e.g.,BufferExhaustedException,SerializationException) from in-flight futures — one failing send never aborts the batch.deliver()refactored to sharebuildRecord/classifyExceptionhelpers withdeliverBatch— no behavior change for single-entry path.Test coverage
New unit tests in
KafkaMessageDelivererBatchTest:flush()call (verified via flush counter override)MockProduceroverride that drivescompleteNext/errorNextfrom inside flush)Integration tests in
okapi-integration-testscontinue to pass with real Kafka.Sublinear scaling: KOJAK-75 motivation
Going from
batchSize=50 → 100gives 32× → 41× (only ~26% more, not 2×). At larger batches, the N individualupdateAfterProcessingcalls become significant relative to the now-fast Kafka path. This is exactly what KOJAK-75 (batch UPDATE viaexecuteBatch) addresses — it'll be the next big lever once HTTP deliverBatch (KOJAK-74) lands.Test plan
./gradlew test ktlintCheck -x :okapi-integration-tests:test— all unit tests + ktlint pass./gradlew :okapi-integration-tests:test— full Testcontainers suite (Postgres + MySQL + Kafka + WireMock) passes./gradlew :okapi-benchmarks:jmhJar+ smoke Kafka throughput run — captured numbers above