diff --git a/README.md b/README.md index 1130bb2..b9a6cc2 100644 --- a/README.md +++ b/README.md @@ -265,17 +265,17 @@ graph BT ## Performance -Throughput baseline (single instance, sync sequential delivery, MacBook M3 Max, JDK 25 LTS, April 2026): +Throughput on a single instance (MacBook M3 Max, JDK 21 LTS, May 2026): | Transport | batchSize=10 | batchSize=100 | |-----------|--------------|----------------| -| Kafka (`acks=all`, localhost broker) | ~110 msg/s | ~115 msg/s | -| HTTP @ webhook latency 20 ms | ~33 msg/s | ~36 msg/s | -| HTTP @ webhook latency 100 ms | ~9 msg/s | ~9 msg/s | +| Kafka (`acks=all`, localhost broker, async batch via `deliverBatch`) | **~1,790 msg/s** | **~5,180 msg/s** | +| HTTP @ webhook latency 20 ms (sync sequential — parallel `sendAsync` planned) | ~38 msg/s | ~38 msg/s | +| HTTP @ webhook latency 100 ms (sync sequential — parallel `sendAsync` planned) | ~9 msg/s | ~9 msg/s | -These numbers reflect the current sync-sequential delivery model. Throughput is bounded by per-message round-trip time × batch size. +Kafka throughput jumped 16-45× over the original sync-sequential baseline thanks to the `deliverBatch` fire-flush-await pattern. HTTP parallel `sendAsync` is next; multi-threaded scheduler scaling is in the roadmap. -Full methodology, raw JMH results, and reproduction instructions: [`benchmarks/`](benchmarks/). +Full methodology, raw JMH results, before/after per change: [`benchmarks/`](benchmarks/). ## Build diff --git a/benchmarks/kafka-deliverbatch.json b/benchmarks/kafka-deliverbatch.json new file mode 100644 index 0000000..34e92f9 --- /dev/null +++ b/benchmarks/kafka-deliverbatch.json @@ -0,0 +1,917 @@ +[ + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.DelivererMicroBenchmark.httpDeliver", + "mode" : "thrpt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "primaryMetric" : { + "score" : 11545.408711236381, + "scoreError" : 149.42306901827996, + "scoreConfidence" : [ + 11395.985642218102, + 11694.83178025466 + ], + "scorePercentiles" : { + "0.0" : 11343.577838549423, + "50.0" : 11546.976349591305, + "90.0" : 11700.020357530904, + "95.0" : 11706.626583508236, + "99.0" : 11706.626583508236, + "99.9" : 11706.626583508236, + "99.99" : 11706.626583508236, + "99.999" : 11706.626583508236, + "99.9999" : 11706.626583508236, + "100.0" : 11706.626583508236 + }, + "scoreUnit" : "ops/s", + "rawData" : [ + [ + 11343.577838549423, + 11562.043798539653, + 11595.069729741723, + 11706.626583508236, + 11479.24403639076 + ], + [ + 11583.270612608234, + 11494.222223496243, + 11517.559065151681, + 11640.564323734912, + 11531.908900642957 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.DelivererMicroBenchmark.kafkaDeliver", + "mode" : "thrpt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "primaryMetric" : { + "score" : 2324097.801992168, + "scoreError" : 19574.869254926263, + "scoreConfidence" : [ + 2304522.932737242, + 2343672.6712470944 + ], + "scorePercentiles" : { + "0.0" : 2301087.014327902, + "50.0" : 2323205.498179472, + "90.0" : 2342128.692513277, + "95.0" : 2342268.944150268, + "99.0" : 2342268.944150268, + "99.9" : 2342268.944150268, + "99.99" : 2342268.944150268, + "99.999" : 2342268.944150268, + "99.9999" : 2342268.944150268, + "100.0" : 2342268.944150268 + }, + "scoreUnit" : "ops/s", + "rawData" : [ + [ + 2331185.5678282077, + 2332932.021373383, + 2323417.3330399687, + 2318151.902267157, + 2313764.626713388 + ], + [ + 2301087.014327902, + 2314310.519122078, + 2340866.427780357, + 2322993.663318975, + 2342268.944150268 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.HttpThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "10", + "httpLatencyMs" : "0" + }, + "primaryMetric" : { + "score" : 0.6379889016144873, + "scoreError" : 0.021636516181889328, + "scoreConfidence" : [ + 0.616352385432598, + 0.6596254177963766 + ], + "scorePercentiles" : { + "0.0" : 0.61547277085, + "50.0" : 0.6390188067891363, + "90.0" : 0.6598710867894737, + "95.0" : 0.660892214, + "99.0" : 0.660892214, + "99.9" : 0.660892214, + "99.99" : 0.660892214, + "99.999" : 0.660892214, + "99.9999" : 0.660892214, + "100.0" : 0.660892214 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.634514846025641, + 0.625434998923077, + 0.61547277085, + 0.660892214, + 0.6327974294358975 + ], + [ + 0.6448399056052632, + 0.6506809418947368, + 0.6435227675526316, + 0.6494216974473684, + 0.6223114444102564 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.HttpThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "10", + "httpLatencyMs" : "20" + }, + "primaryMetric" : { + "score" : 26.4285953021, + "scoreError" : 0.5990869571634768, + "scoreConfidence" : [ + 25.829508344936524, + 27.027682259263475 + ], + "scorePercentiles" : { + "0.0" : 25.8099825625, + "50.0" : 26.4697215625, + "90.0" : 27.1272064871, + "95.0" : 27.1769992495, + "99.0" : 27.1769992495, + "99.9" : 27.1769992495, + "99.99" : 27.1769992495, + "99.999" : 27.1769992495, + "99.9999" : 27.1769992495, + "100.0" : 27.1769992495 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 26.6790716255, + 26.5175469165, + 26.4532531875, + 26.254341833, + 26.4277320625 + ], + [ + 27.1769992495, + 26.6242391255, + 26.4861899375, + 25.856596521, + 25.8099825625 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.HttpThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "10", + "httpLatencyMs" : "100" + }, + "primaryMetric" : { + "score" : 108.51451902080001, + "scoreError" : 1.788240275703777, + "scoreConfidence" : [ + 106.72627874509624, + 110.30275929650378 + ], + "scorePercentiles" : { + "0.0" : 106.7793465, + "50.0" : 108.8223827915, + "90.0" : 110.1245160836, + "95.0" : 110.174805667, + "99.0" : 110.174805667, + "99.9" : 110.174805667, + "99.99" : 110.174805667, + "99.999" : 110.174805667, + "99.9999" : 110.174805667, + "100.0" : 110.174805667 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 110.174805667, + 109.671909833, + 109.338547667, + 108.818140291, + 109.069220459 + ], + [ + 108.333270041, + 108.826625292, + 106.8633295, + 106.7793465, + 107.269994958 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.HttpThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "50", + "httpLatencyMs" : "0" + }, + "primaryMetric" : { + "score" : 0.3214151334021354, + "scoreError" : 0.007895476854073952, + "scoreConfidence" : [ + 0.31351965654806146, + 0.32931061025620934 + ], + "scorePercentiles" : { + "0.0" : 0.31663749476923075, + "50.0" : 0.31942743214687497, + "90.0" : 0.332567413640041, + "95.0" : 0.3332752345806452, + "99.0" : 0.3332752345806452, + "99.9" : 0.3332752345806452, + "99.99" : 0.3332752345806452, + "99.999" : 0.3332752345806452, + "99.9999" : 0.3332752345806452, + "100.0" : 0.3332752345806452 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.3261970251746032, + 0.3188090192, + 0.32004584509375, + 0.3183778794153846, + 0.31789787175384615 + ], + [ + 0.3203602012, + 0.31663749476923075, + 0.324996212203125, + 0.3332752345806452, + 0.3175545506307692 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.HttpThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "50", + "httpLatencyMs" : "20" + }, + "primaryMetric" : { + "score" : 24.8917961376, + "scoreError" : 0.6746586059155605, + "scoreConfidence" : [ + 24.21713753168444, + 25.56645474351556 + ], + "scorePercentiles" : { + "0.0" : 24.373944313, + "50.0" : 24.86101904175, + "90.0" : 25.678303015, + "95.0" : 25.7133840005, + "99.0" : 25.7133840005, + "99.9" : 25.7133840005, + "99.99" : 25.7133840005, + "99.999" : 25.7133840005, + "99.9999" : 25.7133840005, + "100.0" : 25.7133840005 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 24.528713896, + 24.373944313, + 24.587876104, + 24.3818122715, + 24.7403532705 + ], + [ + 25.7133840005, + 25.3625741455, + 25.1709619165, + 25.0766566455, + 24.981684813 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.HttpThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "50", + "httpLatencyMs" : "100" + }, + "primaryMetric" : { + "score" : 105.3130679416, + "scoreError" : 1.0105834148150667, + "scoreConfidence" : [ + 104.30248452678492, + 106.32365135641507 + ], + "scorePercentiles" : { + "0.0" : 104.48651125, + "50.0" : 105.22548381300001, + "90.0" : 106.25646993720001, + "95.0" : 106.277158583, + "99.0" : 106.277158583, + "99.9" : 106.277158583, + "99.99" : 106.277158583, + "99.999" : 106.277158583, + "99.9999" : 106.277158583, + "100.0" : 106.277158583 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 105.101210667, + 104.867716291, + 104.744317333, + 104.505663375, + 104.48651125 + ], + [ + 105.349756959, + 105.928005333, + 106.070272125, + 105.8000675, + 106.277158583 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.HttpThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "100", + "httpLatencyMs" : "0" + }, + "primaryMetric" : { + "score" : 0.29028892570554626, + "scoreError" : 0.007918583246762624, + "scoreConfidence" : [ + 0.2823703424587836, + 0.2982075089523089 + ], + "scorePercentiles" : { + "0.0" : 0.2843618309714286, + "50.0" : 0.2886647397028985, + "90.0" : 0.29998830778948443, + "95.0" : 0.3004070461060606, + "99.0" : 0.3004070461060606, + "99.9" : 0.3004070461060606, + "99.99" : 0.3004070461060606, + "99.999" : 0.3004070461060606, + "99.9999" : 0.3004070461060606, + "100.0" : 0.3004070461060606 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.2852251713857143, + 0.28640042515942027, + 0.28869611163768116, + 0.3004070461060606, + 0.2962196629402985 + ], + [ + 0.2937898953529412, + 0.28863336776811593, + 0.2843618309714286, + 0.28684761410144927, + 0.2923081316323529 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.HttpThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "100", + "httpLatencyMs" : "20" + }, + "primaryMetric" : { + "score" : 26.5454311687, + "scoreError" : 2.321655368816006, + "scoreConfidence" : [ + 24.22377579988399, + 28.867086537516006 + ], + "scorePercentiles" : { + "0.0" : 24.9625513745, + "50.0" : 26.56031102075, + "90.0" : 28.2665264435, + "95.0" : 28.266955333, + "99.0" : 28.266955333, + "99.9" : 28.266955333, + "99.99" : 28.266955333, + "99.999" : 28.266955333, + "99.9999" : 28.266955333, + "100.0" : 28.266955333 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 25.347648792, + 24.9837100835, + 25.181693521, + 25.029404917, + 24.9625513745 + ], + [ + 28.262666438, + 28.266955333, + 27.805809958, + 27.8408980205, + 27.7729732495 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.HttpThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "100", + "httpLatencyMs" : "100" + }, + "primaryMetric" : { + "score" : 107.71372295009999, + "scoreError" : 2.74520541358524, + "scoreConfidence" : [ + 104.96851753651475, + 110.45892836368523 + ], + "scorePercentiles" : { + "0.0" : 105.724950417, + "50.0" : 107.57224191649999, + "90.0" : 110.0029172711, + "95.0" : 110.010051792, + "99.0" : 110.010051792, + "99.9" : 110.010051792, + "99.99" : 110.010051792, + "99.999" : 110.010051792, + "99.9999" : 110.010051792, + "100.0" : 110.010051792 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 109.938706583, + 110.010051792, + 109.341732042, + 108.785946125, + 108.872735042 + ], + [ + 106.358537708, + 106.217716584, + 105.876776708, + 106.0100765, + 105.724950417 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "10" + }, + "primaryMetric" : { + "score" : 0.5589797273644752, + "scoreError" : 0.02867462504485346, + "scoreConfidence" : [ + 0.5303051023196218, + 0.5876543524093286 + ], + "scorePercentiles" : { + "0.0" : 0.5357481022727273, + "50.0" : 0.5593382570777963, + "90.0" : 0.5999115948594048, + "95.0" : 0.603456329275, + "99.0" : 0.603456329275, + "99.9" : 0.603456329275, + "99.99" : 0.603456329275, + "99.999" : 0.603456329275, + "99.9999" : 0.603456329275, + "100.0" : 0.603456329275 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.5357481022727273, + 0.5458592974651163, + 0.5430034906136364, + 0.5579124534651163, + 0.5467915667674419 + ], + [ + 0.5656928481428571, + 0.5625601398333333, + 0.5680089851190476, + 0.603456329275, + 0.5607640606904762 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "50" + }, + "primaryMetric" : { + "score" : 0.24217350536729368, + "scoreError" : 0.006516777387496404, + "scoreConfidence" : [ + 0.23565672797979728, + 0.24869028275479008 + ], + "scorePercentiles" : { + "0.0" : 0.23278964667088609, + "50.0" : 0.24250253058467192, + "90.0" : 0.24726430823421053, + "95.0" : 0.2474329227763158, + "99.0" : 0.2474329227763158, + "99.9" : 0.2474329227763158, + "99.99" : 0.2474329227763158, + "99.999" : 0.2474329227763158, + "99.9999" : 0.2474329227763158, + "100.0" : 0.2474329227763158 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.24562889861842105, + 0.24001745723376625, + 0.24574677735526315, + 0.24101435612987013, + 0.23278964667088609 + ], + [ + 0.2474329227763158, + 0.24399070503947368, + 0.24502732236842106, + 0.24041796587012987, + 0.23966900161038962 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 2, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/21.0.7-tem/bin/java", + "jvmArgs" : [ + "-Xms8g", + "-Xmx8g", + "-XX:+UseG1GC", + "-Dliquibase.duplicateFileMode=WARN" + ], + "jdkVersion" : "21.0.7", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "21.0.7+6-LTS", + "warmupIterations" : 3, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 5, + "measurementTime" : "30 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "100" + }, + "primaryMetric" : { + "score" : 0.19282190281664366, + "scoreError" : 0.003887357443773245, + "scoreConfidence" : [ + 0.1889345453728704, + 0.1967092602604169 + ], + "scorePercentiles" : { + "0.0" : 0.19034086528888888, + "50.0" : 0.1919761961235955, + "90.0" : 0.19834123139408305, + "95.0" : 0.19862080697701148, + "99.0" : 0.19862080697701148, + "99.9" : 0.19862080697701148, + "99.99" : 0.19862080697701148, + "99.999" : 0.19862080697701148, + "99.9999" : 0.19862080697701148, + "100.0" : 0.19862080697701148 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.19862080697701148, + 0.19582505114772727, + 0.191856503247191, + 0.192095889, + 0.19165331980898875 + ], + [ + 0.19349963433707865, + 0.1921436610224719, + 0.19178751501123595, + 0.1903957823258427, + 0.19034086528888888 + ] + ] + }, + "secondaryMetrics" : { + } + } +] + + diff --git a/benchmarks/results-kafka-deliverbatch.md b/benchmarks/results-kafka-deliverbatch.md new file mode 100644 index 0000000..44f5ace --- /dev/null +++ b/benchmarks/results-kafka-deliverbatch.md @@ -0,0 +1,99 @@ +# Kafka deliverBatch fire-flush-await — Results (KOJAK-73) + +Measured on MacBook M3 Max, JDK 21 LTS, Postgres 16 + Kafka 3.8.1 via Testcontainers, +full JMH config: `fork=2, warmup=3 × 10s, iter=5 × 30s` — n=10 samples per benchmark. + +## Headline numbers — Kafka throughput + +| batchSize | Baseline (ms/op) | Optimized (ms/op) | **Improvement** | +|-----------|------------------|-------------------|-----------------| +| 10 | 9.168 | 0.559 ± 0.029 | **16.4×** | +| 50 | 8.665 | 0.242 ± 0.007 | **35.8×** | +| 100 | 8.701 | 0.193 ± 0.004 | **45.1×** | + +Translated to msg/s (`@OperationsPerInvocation(1000)`): + +| batchSize | Baseline | Optimized | Improvement | +|-----------|------------|------------------|-------------| +| 10 | ~109 msg/s | **~1,790 msg/s** | 16.4× | +| 50 | ~115 msg/s | **~4,132 msg/s** | 35.8× | +| 100 | ~115 msg/s | **~5,181 msg/s** | 45.1× | + +Raw JSON: [`kafka-deliverbatch.json`](kafka-deliverbatch.json). + +## What changed + +`KafkaMessageDeliverer.deliverBatch` now uses fire-flush-await: +1. **Fire** — call `producer.send()` for every entry (non-blocking; records go to producer's internal buffer) +2. **Flush** — single `producer.flush()` call drives all queued records to the broker in one batched network round-trip (bypasses `linger.ms`) +3. **Await** — `Future.get()` per entry returns immediately because completion is settled by `flush()` + +Previously, each entry incurred a full `producer.send().get()` round-trip sequentially. With ~9 ms localhost Kafka RTT (`acks=all`), 1000 entries × 9 ms = ~9 s regardless of `batchSize`. + +## Reading the table + +- **`batchSize` is now load-bearing.** Pre-optimization throughput was flat across `batchSize` + values (109 → 115 → 115 msg/s) — confirming the bottleneck was per-record blocking I/O. + Post-optimization throughput scales with `batchSize` (1,790 → 4,132 → 5,181), proving that + Kafka's internal record batching is now being exploited. +- **Sublinear scaling 50 → 100** (36× → 45× vs expected ~2× more). Indicates that DB UPDATE + overhead per entry is now significant relative to the (now-fast) Kafka path. This is exactly + what motivates the batch UPDATE optimization via JDBC `executeBatch` (KOJAK-75) — at small + batch sizes the per-message DB cost was hidden by 9 ms Kafka RTT; with Kafka latency removed, + the N individual UPDATE statements become the next bottleneck to attack. +- **batchSize=10 lowest gain (16.4×)** — at that batch size only 10 records can amortize + one RTT, so the per-batch overhead (claimPending, transaction begin/commit, 10 UPDATEs) is + proportionally larger. +- **All Kafka throughput error bars <5% of score** — confidence intervals are narrow enough + to defend the multipliers. Numbers independently reproduced across two separate runs. + +## Code overhead microbenchmarks + +`DelivererMicroBenchmark` measures the cost of `deliver()` with I/O mocked away — useful as +a regression check on the library code itself (Jackson deserialize + record construction + +exception classification + result wrapping). + +| Benchmark | Score | Notes | +|--------------|------------------------|--------------------------------------------------| +| kafkaDeliver | 2,324,098 ± 19,575 ops/s | ~430 ns per `deliver()` (MockProducer, no I/O) | +| httpDeliver | 11,545 ± 149 ops/s | ~87 µs per `deliver()` (WireMock localhost) | + +In production these numbers are dominated by network I/O (~10 ms localhost Kafka, ~5-50 ms +HTTP webhook), so the library overhead is <1% of real-world per-message cost. Microbench is +there to catch regressions if anyone refactors `KafkaMessageDeliverer`/`HttpMessageDeliverer` +and accidentally adds allocations or expensive work to the hot path. + +## HTTP throughput (companion benchmark) + +HTTP path remains sync sequential (KOJAK-74 will apply parallel `sendAsync`). Numbers below +show per-message cost at different webhook latencies — useful for understanding the gap that +KOJAK-74 closes: + +| batchSize | latency 0 ms | latency 20 ms | latency 100 ms | +|-----------|------------------|-------------------|-------------------| +| 10 | 0.638 ms/op | 26.429 ms/op | 108.515 ms/op | +| 50 | 0.321 ms/op | 24.892 ms/op | 105.313 ms/op | +| 100 | 0.290 ms/op | 26.545 ms/op | 107.714 ms/op | + +Flat per-message latency at `latencyMs=20/100` confirms HTTP is fully sequential: each request +waits for the previous response before the next goes out. + +## Verification context + +- Unit tests: `KafkaMessageDelivererBatchTest` covers empty input, all-success ordering, + single flush call (verified via flush counter), synchronous send exception (Permanent + + Retriable variants), and future-based async exception. +- Integration tests in `okapi-integration-tests` continue to pass with real Postgres + Kafka. +- ktlint clean, configuration cache reuses across modules. + +## What's next + +1. **HTTP `deliverBatch`** (KOJAK-74) — analogous fire-all-await for HTTP via parallel + `httpClient.sendAsync`. Expected impact at realistic webhook latency + (`httpLatencyMs ∈ {20, 100}`): from ~38 / ~9 msg/s baseline to **~500-2,000 msg/s** range, + depending on host/connection pool reuse. +2. **Batch UPDATE via JDBC `executeBatch`** (KOJAK-75). Now load-bearing: at `batchSize=100` + the N individual UPDATE statements have become the dominant per-batch cost. Expected + to shift `batchSize=100` Kafka throughput from ~5,200 toward the ~10,000 msg/s range. +3. **Concurrent processor fan-out** (KOJAK-77) — multi-threaded scheduler. Multiplies all + of the above by N workers. diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt index 39eb076..1ba4ef6 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt @@ -26,7 +26,7 @@ class CompositeMessageDeliverer(deliverers: List) : MessageDel * Entries whose type has no registered deliverer are mapped to * [DeliveryResult.PermanentFailure] (consistent with [deliver]). */ - override fun deliverBatch(entries: List): List> { + override fun deliverBatch(entries: List): List { if (entries.isEmpty()) return emptyList() val resultByEntry: Map = entries @@ -36,11 +36,13 @@ class CompositeMessageDeliverer(deliverers: List) : MessageDel if (deliverer != null) { deliverer.deliverBatch(group) } else { - group.map { it to DeliveryResult.PermanentFailure("No deliverer registered for type '$type'") } + group.map { DeliveryOutcome(it, DeliveryResult.PermanentFailure("No deliverer registered for type '$type'")) } } } - .toMap() + .associate { it.entry to it.result } - return entries.map { entry -> entry to (resultByEntry[entry] ?: error("missing result for entry ${entry.outboxId}")) } + return entries.map { entry -> + DeliveryOutcome(entry, resultByEntry[entry] ?: error("missing result for entry ${entry.outboxId}")) + } } } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt new file mode 100644 index 0000000..312a48f --- /dev/null +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt @@ -0,0 +1,12 @@ +package com.softwaremill.okapi.core + +/** + * Per-entry result of one [MessageDeliverer.deliverBatch] invocation. + * + * Transient transport-layer report — consumed by [OutboxEntryProcessor] + * in the same batch cycle and never persisted. + */ +data class DeliveryOutcome( + val entry: OutboxEntry, + val result: DeliveryResult, +) diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt index 38f1bfe..43ea43e 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt @@ -9,6 +9,12 @@ package com.softwaremill.okapi.core interface MessageDeliverer { val type: String + /** + * Delivers a single entry. MUST NOT throw — transport-level errors surface + * as [DeliveryResult.RetriableFailure] (transient: network, timeout, interrupt) + * or [DeliveryResult.PermanentFailure] (won't fix itself: corrupt metadata, + * missing service, auth, payload too large). + */ fun deliver(entry: OutboxEntry): DeliveryResult /** @@ -20,8 +26,12 @@ interface MessageDeliverer { * be overlapped (e.g. Kafka's internal record batching, parallel HTTP * `sendAsync`) should override this method to exploit that. * - * Per-entry result classification (Success / RetriableFailure / PermanentFailure) - * is preserved — callers receive one [DeliveryResult] per input entry. + * Implementations MUST NOT abort the batch on individual entry failure; + * the returned list always has the same size as [entries], in input order, + * with each entry independently classified as Success / RetriableFailure / + * PermanentFailure. This method MUST NOT throw — transport-level errors + * surface as [DeliveryResult.RetriableFailure] or [DeliveryResult.PermanentFailure] + * on the affected entries. */ - fun deliverBatch(entries: List): List> = entries.map { entry -> entry to deliver(entry) } + fun deliverBatch(entries: List): List = entries.map { entry -> DeliveryOutcome(entry, deliver(entry)) } } diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt index 4716327..81eb719 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt @@ -37,11 +37,11 @@ class CompositeMessageDelivererTest : FunSpec({ val results = composite.deliverBatch(entries) results.size shouldBe 4 - results.map { it.first } shouldBe entries - results[0].second shouldBe DeliveryResult.Success - results[1].second shouldBe DeliveryResult.RetriableFailure("503") - results[2].second shouldBe DeliveryResult.Success - results[3].second shouldBe DeliveryResult.RetriableFailure("503") + results.map { it.entry } shouldBe entries + results[0].result shouldBe DeliveryResult.Success + results[1].result shouldBe DeliveryResult.RetriableFailure("503") + results[2].result shouldBe DeliveryResult.Success + results[3].result shouldBe DeliveryResult.RetriableFailure("503") } test("deliverBatch fails permanently for entries with no registered deliverer") { @@ -56,9 +56,9 @@ class CompositeMessageDelivererTest : FunSpec({ val results = composite.deliverBatch(entries) results.size shouldBe 2 - results[0].second shouldBe DeliveryResult.Success - results[1].second.shouldBeInstanceOf() - (results[1].second as DeliveryResult.PermanentFailure).error shouldContain "missing" + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + (results[1].result as DeliveryResult.PermanentFailure).error shouldContain "missing" } test("deliverBatch with empty input returns empty list") { @@ -72,17 +72,17 @@ class CompositeMessageDelivererTest : FunSpec({ val kafkaDeliverer = object : MessageDeliverer { override val type = "kafka" override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success - override fun deliverBatch(entries: List): List> { + override fun deliverBatch(entries: List): List { batchCallsKafka++ - return entries.map { it to DeliveryResult.Success } + return entries.map { DeliveryOutcome(it, DeliveryResult.Success) } } } val httpDeliverer = object : MessageDeliverer { override val type = "http" override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success - override fun deliverBatch(entries: List): List> { + override fun deliverBatch(entries: List): List { batchCallsHttp++ - return entries.map { it to DeliveryResult.Success } + return entries.map { DeliveryOutcome(it, DeliveryResult.Success) } } } val composite = CompositeMessageDeliverer(listOf(kafkaDeliverer, httpDeliverer)) diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt index 201900c..27c3753 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt @@ -29,12 +29,12 @@ class MessageDelivererTest : FunSpec({ val results = deliverer.deliverBatch(entries) results.size shouldBe 3 - results[0].first shouldBe entries[0] - results[0].second shouldBe DeliveryResult.Success - results[1].first shouldBe entries[1] - results[1].second shouldBe DeliveryResult.RetriableFailure("err1") - results[2].first shouldBe entries[2] - results[2].second shouldBe DeliveryResult.PermanentFailure("err2") + results[0].entry shouldBe entries[0] + results[0].result shouldBe DeliveryResult.Success + results[1].entry shouldBe entries[1] + results[1].result shouldBe DeliveryResult.RetriableFailure("err1") + results[2].entry shouldBe entries[2] + results[2].result shouldBe DeliveryResult.PermanentFailure("err2") } test("default deliverBatch on empty input returns empty list without calling deliver") { diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transport/KafkaTransportIntegrationTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transport/KafkaTransportIntegrationTest.kt index d591cf9..7fd2dc5 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transport/KafkaTransportIntegrationTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transport/KafkaTransportIntegrationTest.kt @@ -17,17 +17,17 @@ import java.util.UUID class KafkaTransportIntegrationTest : FunSpec({ val kafka = KafkaTestSupport() - var producer: KafkaProducer? = null + lateinit var producer: KafkaProducer lateinit var deliverer: KafkaMessageDeliverer beforeSpec { kafka.start() producer = kafka.createProducer() - deliverer = KafkaMessageDeliverer(producer!!) + deliverer = KafkaMessageDeliverer(producer) } afterSpec { - producer?.close() + producer.close() kafka.stop() } @@ -121,4 +121,35 @@ class KafkaTransportIntegrationTest : FunSpec({ result shouldBe DeliveryResult.Success } + + test("deliverBatch sends all entries to topic and returns Success in input order") { + val topic = "batch-topic-${UUID.randomUUID()}" + val entries = (0 until 25).map { i -> + entryWithInfo(topic = topic, payload = """{"seq":$i}""") + } + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe entries.size + results.forEachIndexed { i, (entry, result) -> + entry.outboxId shouldBe entries[i].outboxId + result shouldBe DeliveryResult.Success + } + + val consumer = kafka.createConsumer(groupId = "test-batch-${UUID.randomUUID()}") + consumer.subscribe(listOf(topic)) + val received = mutableListOf() + val deadline = Instant.now().plusSeconds(15) + while (received.size < entries.size && Instant.now().isBefore(deadline)) { + consumer.poll(Duration.ofSeconds(2)).forEach { received.add(it.value()) } + } + consumer.close() + + received.size shouldBe entries.size + } + + test("deliverBatch on empty input returns empty list without contacting broker") { + val results = deliverer.deliverBatch(emptyList()) + results.size shouldBe 0 + } }) diff --git a/okapi-kafka/build.gradle.kts b/okapi-kafka/build.gradle.kts index aa4133a..6575710 100644 --- a/okapi-kafka/build.gradle.kts +++ b/okapi-kafka/build.gradle.kts @@ -9,6 +9,7 @@ dependencies { api(project(":okapi-core")) implementation(libs.jacksonModuleKotlin) implementation(libs.jacksonDatatypeJsr310) + implementation(libs.slf4jApi) api(libs.kafkaClients) testImplementation(libs.kotestRunnerJunit5) diff --git a/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt b/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt index 31b99b2..b1d8796 100644 --- a/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt +++ b/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt @@ -1,45 +1,133 @@ package com.softwaremill.okapi.kafka +import com.softwaremill.okapi.core.DeliveryOutcome import com.softwaremill.okapi.core.DeliveryResult import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntry import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.errors.InterruptException import org.apache.kafka.common.errors.RetriableException +import org.slf4j.LoggerFactory import java.util.concurrent.ExecutionException +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException /** * [MessageDeliverer] that publishes outbox entries to Kafka topics. * - * Uses the provided [Producer] to send records synchronously. - * Kafka [RetriableException]s map to [DeliveryResult.RetriableFailure]; - * all other errors map to [DeliveryResult.PermanentFailure]. + * Exception classification: + * - Kafka [RetriableException] → [DeliveryResult.RetriableFailure] + * - Thread interrupts ([InterruptException], [InterruptedException]) → + * [DeliveryResult.RetriableFailure] (interrupt flag restored) + * - all other errors → [DeliveryResult.PermanentFailure] */ class KafkaMessageDeliverer( private val producer: Producer, ) : MessageDeliverer { override val type: String = KafkaDeliveryInfo.TYPE - override fun deliver(entry: OutboxEntry): DeliveryResult { - val info = KafkaDeliveryInfo.deserialize(entry.deliveryMetadata) - val record = - ProducerRecord(info.topic, info.partitionKey, entry.payload).apply { - info.headers.forEach { (k, v) -> headers().add(k, v.toByteArray()) } - } + override fun deliver(entry: OutboxEntry): DeliveryResult = try { + producer.send(buildRecord(entry)).get() + DeliveryResult.Success + } catch (e: ExecutionException) { + classifyExecutionException(e) + } catch (e: Exception) { + classifyException(e) + } + + /** + * Uses fire-flush-await: send all entries, then a single `flush()` (which + * bypasses `linger.ms`), then collect outcomes via non-blocking `Future.get()` + * since completion is already settled. A failing `send()` does not abort the + * batch; the result list mirrors input order. + * + * If `flush()` itself fails (interrupt, fatal producer state), per-entry + * futures still surface their own exception via `get()` and are classified + * individually — the batch as a whole is never abandoned. + */ + override fun deliverBatch(entries: List): List { + if (entries.isEmpty()) return emptyList() + + val inflight: List> = entries.map { entry -> + entry to fireOne(entry) + } + + try { + producer.flush() + } catch (e: InterruptException) { + Thread.currentThread().interrupt() + logger.warn("Kafka producer.flush() interrupted; per-entry futures will surface the cause", e) + } catch (e: Exception) { + logger.warn("Kafka producer.flush() failed for batch of {}; classifying per-entry from future state", entries.size, e) + } + + return inflight.map { (entry, outcome) -> DeliveryOutcome(entry, awaitOne(outcome)) } + } + + private fun fireOne(entry: OutboxEntry): SendOutcome = try { + SendOutcome.Sent(producer.send(buildRecord(entry))) + } catch (e: Exception) { + val classified = classifyException(e) + logger.debug("Kafka send rejected synchronously for entry {}", entry.outboxId, e) + SendOutcome.ImmediateFailure(classified) + } - return try { - producer.send(record).get() + private fun awaitOne(outcome: SendOutcome): DeliveryResult = when (outcome) { + is SendOutcome.ImmediateFailure -> outcome.result + is SendOutcome.Sent -> try { + outcome.future.get(AWAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS) DeliveryResult.Success } catch (e: ExecutionException) { - classifyException(e.cause ?: e) + classifyExecutionException(e) + } catch (e: TimeoutException) { + // After flush() returns normally, futures are settled per Kafka contract and get() is + // non-blocking. Reaching this branch means flush() failed (non-Interrupt) AND the future + // is still pending — Kafka's own delivery.timeout.ms would eventually fire, but we + // refuse to stall the processor thread that long. RetriableFailure → outbox retries. + DeliveryResult.RetriableFailure("Settlement timeout after ${AWAIT_TIMEOUT_MS}ms") } catch (e: Exception) { classifyException(e) } } - private fun classifyException(e: Throwable): DeliveryResult = if (e is RetriableException) { - DeliveryResult.RetriableFailure(e.message ?: "Retriable Kafka error") - } else { - DeliveryResult.PermanentFailure(e.message ?: "Permanent Kafka error") + private fun buildRecord(entry: OutboxEntry): ProducerRecord { + val info = KafkaDeliveryInfo.deserialize(entry.deliveryMetadata) + return ProducerRecord(info.topic, info.partitionKey, entry.payload).apply { + info.headers.forEach { (k, v) -> headers().add(k, v.toByteArray()) } + } + } + + private fun classifyException(e: Throwable): DeliveryResult { + val message = e.message ?: e.javaClass.simpleName + return when { + e is InterruptException || e is InterruptedException -> { + Thread.currentThread().interrupt() + DeliveryResult.RetriableFailure(message) + } + e is RetriableException -> DeliveryResult.RetriableFailure(message) + else -> DeliveryResult.PermanentFailure(message) + } + } + + private fun classifyExecutionException(e: ExecutionException): DeliveryResult = e.cause?.let { classifyException(it) } + ?: DeliveryResult.RetriableFailure("ExecutionException with no cause") + + private sealed interface SendOutcome { + data class Sent(val future: Future) : SendOutcome + data class ImmediateFailure(val result: DeliveryResult) : SendOutcome + } + + companion object { + // Bounds awaitOne()'s wait per in-flight Future after flush(). Successful flush() guarantees + // futures are settled (returns ~0 ms); this only matters if flush() failed with a non-Interrupt + // exception and a Future stayed incomplete. 5 s is short enough to keep processor throughput + // bounded under disaster (max 5 s × batchSize stall) while letting healthy late-settling + // futures complete. + private const val AWAIT_TIMEOUT_MS = 5_000L + + private val logger = LoggerFactory.getLogger(KafkaMessageDeliverer::class.java) } } diff --git a/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt new file mode 100644 index 0000000..bf68970 --- /dev/null +++ b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt @@ -0,0 +1,253 @@ +package com.softwaremill.okapi.kafka + +import com.softwaremill.okapi.core.DeliveryOutcome +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxMessage +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import io.kotest.matchers.string.shouldContain +import io.kotest.matchers.types.shouldBeInstanceOf +import org.apache.kafka.clients.producer.MockProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.errors.AuthenticationException +import org.apache.kafka.common.errors.InterruptException +import org.apache.kafka.common.errors.NetworkException +import org.apache.kafka.common.serialization.StringSerializer +import java.time.Instant +import java.util.concurrent.Executors +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException + +private fun entry(suffix: String, metadataOverride: String? = null): OutboxEntry { + val info = kafkaDeliveryInfo { topic = "topic-$suffix" } + val baseEntry = OutboxEntry.createPending(OutboxMessage("evt-$suffix", """{"k":"v-$suffix"}"""), info, Instant.now()) + return if (metadataOverride != null) baseEntry.copy(deliveryMetadata = metadataOverride) else baseEntry +} + +class KafkaMessageDelivererBatchTest : FunSpec({ + test("deliverBatch on empty input returns empty list and does not invoke producer") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + val deliverer = KafkaMessageDeliverer(producer) + + deliverer.deliverBatch(emptyList()) shouldBe emptyList() + producer.history().size shouldBe 0 + } + + test("deliverBatch with all-success preserves input order and reports all entries delivered") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b"), entry("c")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 3 + results.map { it.entry } shouldBe entries + results.forEach { (_, r) -> r shouldBe DeliveryResult.Success } + producer.history().size shouldBe 3 + } + + test("deliverBatch fires all sends before flushing — single flush call") { + // MockProducer with autoComplete=false: futures stay pending until completeNext/errorNext, + // so flush() is the only way settlement can happen — verifies the fire-flush-await sequence. + var flushCount = 0 + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + flushCount++ + while (completeNext()) Unit + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b"), entry("c")) + + val results = deliverer.deliverBatch(entries) + + flushCount shouldBe 1 + results.forEach { (_, r) -> r shouldBe DeliveryResult.Success } + } + + test("deliverBatch maps synchronous PermanentFailure for all entries when sendException is global") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + producer.sendException = AuthenticationException("bad creds") + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 2 + results.forEach { (_, r) -> + r.shouldBeInstanceOf() + (r as DeliveryResult.PermanentFailure).error shouldContain "bad creds" + } + } + + test("deliverBatch maps synchronous RetriableFailure when send throws RetriableException") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + producer.sendException = NetworkException("broker temporarily unreachable") + val deliverer = KafkaMessageDeliverer(producer) + + val results = deliverer.deliverBatch(listOf(entry("a"))) + + results[0].result.shouldBeInstanceOf() + } + + test("deliverBatch with future-based RetriableException classifies as RetriableFailure") { + // Drive mixed outcomes from inside flush(): entry 0 completes OK, entry 1 errors. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + completeNext() + errorNext(NetworkException("transient")) + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 2 + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + (results[1].result as DeliveryResult.RetriableFailure).error shouldContain "transient" + } + + test("deliverBatch handles mixed sync-throw + async outcomes in one batch with positional integrity") { + // Throw synchronously on the 2nd send only; first goes async-success, third goes async-fail. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + private var sendCount = 0 + + override fun send(record: ProducerRecord): Future { + sendCount++ + if (sendCount == 2) throw AuthenticationException("forbidden on send #$sendCount") + return super.send(record) + } + + override fun flush() { + completeNext() // entry 0 -> Success + errorNext(NetworkException("async fail")) // entry 2 -> Retriable + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b"), entry("c")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 3 + // Positional integrity: result[i] corresponds to entries[i] regardless of outcome variant + results.map { it.entry } shouldBe entries + + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + (results[1].result as DeliveryResult.PermanentFailure).error shouldContain "forbidden" + results[2].result.shouldBeInstanceOf() + (results[2].result as DeliveryResult.RetriableFailure).error shouldContain "async fail" + } + + test("deliverBatch poison-pill metadata yields PermanentFailure for bad entry, others unaffected") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + val deliverer = KafkaMessageDeliverer(producer) + val good1 = entry("good1") + val poisoned = entry("bad", metadataOverride = "{not valid kafka info json}") + val good2 = entry("good2") + + val results = deliverer.deliverBatch(listOf(good1, poisoned, good2)) + + results.size shouldBe 3 + results.map { it.entry } shouldBe listOf(good1, poisoned, good2) + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + results[2].result shouldBe DeliveryResult.Success + // Only the good entries actually reached the producer + producer.history().size shouldBe 2 + } + + test("deliverBatch survives flush throwing non-Interrupt exception by classifying per-entry futures") { + // Flush blows up; each per-entry future has been settled by completeNext/errorNext just before. + // Contract: deliverBatch never re-throws — it always returns one DeliveryResult per input entry. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + completeNext() + errorNext(NetworkException("via future")) + throw IllegalStateException("producer fatally borked") + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 2 + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + } + + test("deliverBatch interrupted during flush re-arms interrupt flag and classifies pending futures as Retriable") { + // flush() throws Kafka's InterruptException without settling the futures. Our awaitOne + // then encounters Future.get() on an interrupted thread; for incomplete futures this + // raises InterruptedException which we explicitly classify as RetriableFailure (so the + // outbox reschedules instead of marking PermanentFailure). + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + throw InterruptException("interrupted") + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a")) + + val results: List + try { + results = deliverer.deliverBatch(entries) + } finally { + // Drain the interrupt status so it doesn't leak to the next test (Thread.interrupted clears it). + Thread.interrupted() + } + + results.size shouldBe 1 + results[0].result.shouldBeInstanceOf() + } + + test("deliverBatch with non-Interrupt flush() failure and pending futures returns Retriable per entry (does not hang)") { + // If flush() throws a non-Interrupt exception (e.g. IllegalStateException from a broken + // producer state) and the futures stay pending, awaitOne() must NOT bare-block on + // Future.get(). Each get() is bounded by AWAIT_TIMEOUT_MS so the processor thread is + // released within a defined budget and outbox can retry the entries. + // + // The test runs deliverBatch on a separate thread guarded by an outer timeout — if the + // production code hangs (e.g. someone removes the get(timeout) defense), the test fails + // fast rather than wedging the test JVM. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + throw IllegalStateException("simulated non-Interrupt flush failure") + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val executor = Executors.newSingleThreadExecutor { r -> + Thread(r, "deliverBatch-hang-probe").apply { isDaemon = true } + } + try { + val task = executor.submit> { deliverer.deliverBatch(entries) } + // 30 s outer budget: each pending future waits AWAIT_TIMEOUT_MS (5 s) × 2 entries = 10 s + // expected wall time; the rest is slack for GC / CI noise. + val results = try { + task.get(30, TimeUnit.SECONDS) + } catch (e: TimeoutException) { + task.cancel(true) + throw AssertionError( + "deliverBatch hung for >30s when flush() threw and futures stayed pending — " + + "bare Future.get() in awaitOne() needs a bounded get(timeout) defense", + ) + } + + results.size shouldBe entries.size + results.forEachIndexed { i, outcome -> + outcome.entry.outboxId shouldBe entries[i].outboxId + val retriable = outcome.result.shouldBeInstanceOf() + retriable.error shouldContain "timeout" + } + } finally { + executor.shutdownNow() + } + } +}) diff --git a/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererTest.kt b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererTest.kt index 31f4cc4..26a3d6a 100644 --- a/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererTest.kt +++ b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererTest.kt @@ -8,6 +8,7 @@ import io.kotest.matchers.shouldBe import io.kotest.matchers.types.shouldBeInstanceOf import org.apache.kafka.clients.producer.MockProducer import org.apache.kafka.common.errors.AuthenticationException +import org.apache.kafka.common.errors.InterruptException import org.apache.kafka.common.errors.NetworkException import org.apache.kafka.common.errors.RecordTooLargeException import org.apache.kafka.common.serialization.StringSerializer @@ -45,4 +46,33 @@ class KafkaMessageDelivererTest : FunSpec({ val deliverer = KafkaMessageDeliverer(producer) deliverer.deliver(entry()).shouldBeInstanceOf() } + + test("Kafka InterruptException on send → RetriableFailure (interrupt flag restored)") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + producer.sendException = InterruptException("interrupted") + val deliverer = KafkaMessageDeliverer(producer) + try { + deliverer.deliver(entry()).shouldBeInstanceOf() + Thread.currentThread().isInterrupted shouldBe true + } finally { + // Clear the interrupt flag so it doesn't leak to subsequent tests. + Thread.interrupted() + } + } + + test("Kafka InterruptException on send in deliverBatch → RetriableFailure per entry") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + producer.sendException = InterruptException("interrupted") + val deliverer = KafkaMessageDeliverer(producer) + try { + val results = deliverer.deliverBatch(listOf(entry(), entry())) + results.size shouldBe 2 + results.forEach { (_, result) -> + result.shouldBeInstanceOf() + } + Thread.currentThread().isInterrupted shouldBe true + } finally { + Thread.interrupted() + } + } })