Optional I/O-concurrent runtime adapter for cdc-core.
cdc-concurrent executes CDC::Core::Processor objects with Fiber-scheduler-based I/O concurrency using async. It is the I/O-bound twin of cdc-parallel.
- Ruby 3.4+
cdc-coreasync
cdc-core
|
|-- cdc-parallel
| CPU-bound parallelism
|
`-- cdc-concurrent
I/O-bound concurrency
Use cdc-concurrent for processors that spend most of their time waiting on fiber-scheduler-compatible I/O:
- HTTP webhooks
- external API enrichment
- Redis publishing
- OpenSearch or Elasticsearch indexing
- S3 or object-storage writes
- async sink fanout
- database writes through compatible drivers
Use cdc-parallel for CPU-bound work such as pgoutput parsing, OID decoding, JSON parsing, diff computation, compression, and analytics calculations.
gem "cdc-concurrent"require "cdc/core"
require "cdc/concurrent"
class WebhookProcessor < CDC::Core::Processor
concurrent_safe!
def process(event)
# Perform fiber-scheduler-compatible I/O here.
CDC::Core::ProcessorResult.success(event)
end
end
runtime = CDC::Concurrent::Runtime.new(
processor: WebhookProcessor.new,
concurrency: 100,
timeout: 5.0
)
result = runtime.process(event)
runtime.shutdownresults = runtime.process_many(events)Results preserve input order by default. Set preserve_order: false when completion order is acceptable.
For I/O-bound throughput, process_many is the primary path. Repeated
single-event process calls still pay one Async dispatch per event, while
process_many lets the runtime overlap waits across the submitted batch.
result = runtime.process_transaction(transaction)Transactions are processed event-by-event. The returned ProcessorResult#event contains the per-event results. If any event fails, the transaction result fails and carries the first error.
Only processors that declare concurrent_safe! can run in this runtime.
class SinkProcessor < CDC::Core::Processor
concurrent_safe!
endUnsafe processors raise:
CDC::Concurrent::UnsafeProcessorErrorA concurrent-safe processor should avoid unsafe shared mutable instance state. This runtime runs tasks concurrently in one Ruby process; it does not isolate mutable objects like Ractors do.
concurrent_safe! is a declaration of processor intent, not a proof. The
runtime cannot verify that a processor avoids unsafe shared mutable state or
uses scheduler-compatible I/O.
cdc-concurrent improves throughput only for I/O that cooperates with Ruby's Fiber scheduler. Blocking libraries that do not yield to the scheduler will still block the process.
Timeouts are applied per event task. They are not whole-batch or whole-transaction deadlines.
For CPU-bound processing, use cdc-parallel.
- Move
concurrent_safe!intocdc-core - Retry and backoff policies
- Dead-letter handling
- Async HTTP webhook helpers
- Sink abstractions
- Async Redis/OpenSearch integrations
The test suite is grouped by intent so the same structure can be reused across CDC ecosystem gems.
test/unit/ focused class and branch coverage
test/integration/ component interaction and runtime integration
test/behavior/ ecosystem contracts and guardrails
test/performance/ opt-in smoke benchmarks
Run the default quality suite:
bundle exec rake testRun a specific group:
bundle exec rake test:unit
bundle exec rake test:integration
bundle exec rake test:behavior
bundle exec rake test:performanceThe default test task runs unit, integration, and behavior tests. Performance tests are intentionally separate because they are environment-sensitive.
cdc-concurrent includes reproducible benchmarks that compare serial processor execution against the Async-backed processor pool.
The benchmark focuses on three workload categories:
| Workload | Purpose |
|---|---|
| tiny | Measure dispatch overhead |
| io | Measure scheduler-friendly I/O concurrency |
| batch | Measure batched CDC event I/O fanout |
See benchmark/README.md for the full benchmark methodology, configuration reference, report schema, and interpretation guidance.
cdc-parallel and cdc-concurrent benchmark different bottlenecks.
cdc-parallel measures CPU parallelism; cdc-concurrent measures I/O wait
overlap. Their speedup ratios are not directly comparable.
Default I/O workload:
bundle exec rake benchmark:processor_poolTiny overhead workload:
BENCHMARK_WORKLOAD=tiny \
bundle exec rake benchmark:processor_poolBatch workload:
BENCHMARK_WORKLOAD=batch \
BENCHMARK_BATCH_SIZE=1000 \
bundle exec rake benchmark:processor_poolConcurrency sweep:
BENCHMARK_WORKLOAD=io \
BENCHMARK_CONCURRENCY_COUNTS=1,10,50,100 \
bundle exec rake benchmark:processor_poolCredibility controls:
BENCHMARK_TRIALS=7 \
BENCHMARK_MIN_DURATION=0.25 \
BENCHMARK_ITERATIONS=1000 \
bundle exec rake benchmark:processor_poolBuild and run the reusable Docker image:
bundle exec rake benchmark:docker_build
bundle exec rake benchmark:docker_runOr run the image directly after it is published to GitHub Container Registry:
docker run --rm ghcr.io/kanutocd/cdc-concurrent-benchmark:mainThe benchmark image is intended to follow the shared performance validation pattern across CDC Ecosystem gems, enabling reproducible benchmark execution locally, in CI, and across different development environments.
MIT.