diff --git a/README.md b/README.md index 1130bb2..52f7887 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,17 @@ springOutboxPublisher.publish( > **Note:** Spring and Kafka versions are not forced by okapi — you control them. > Okapi uses plain JDBC internally — it works with any `PlatformTransactionManager` (JPA, JDBC, jOOQ, Exposed, etc.). +`okapi-spring-boot` requires a `TransactionRunner` bean to bracket each scheduler tick in a transaction. The autoconfiguration derives one from any `PlatformTransactionManager` on the classpath (`spring-boot-starter-jdbc` or `spring-boot-starter-data-jpa` provide one out of the box) — no extra wiring needed in typical setups. If your application has no `PlatformTransactionManager` (single-instance, no transaction infrastructure) you must opt in explicitly: + +```kotlin +@Bean +fun outboxTransactionRunner(): TransactionRunner = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = block() +} +``` + +Without bracketing, `FOR UPDATE SKIP LOCKED` collapses to the single SELECT statement under JDBC auto-commit, which silently allows duplicate delivery across processor instances. This opt-in is intentionally manual to keep accidental misconfiguration out of multi-instance deployments. + ## How It Works Okapi implements the [transactional outbox pattern](https://softwaremill.com/microservices-101/) (see also: [microservices.io description](https://microservices.io/patterns/data/transactional-outbox.html)): diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3f5e36b..93705c8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -20,6 +20,9 @@ h2 = "2.4.240" micrometer = "1.16.5" jmh = "1.37" jmhGradlePlugin = "0.7.3" +# Hibernate version aligned with what Spring 7.x targets (Hibernate 7.0 ORM); only used in the +# integration-tests module to exercise JpaTransactionManager fail-fast extraction. +hibernate = "7.1.4.Final" [libraries] kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" } @@ -44,10 +47,14 @@ kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka springContext = { module = "org.springframework:spring-context", version.ref = "spring" } springTx = { module = "org.springframework:spring-tx", version.ref = "spring" } springJdbc = { module = "org.springframework:spring-jdbc", version.ref = "spring" } +springOrm = { module = "org.springframework:spring-orm", version.ref = "spring" } +hibernateCore = { module = "org.hibernate.orm:hibernate-core", version.ref = "hibernate" } springBootAutoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "springBoot" } springBootStarterValidation = { module = "org.springframework.boot:spring-boot-starter-validation", version.ref = "springBoot" } springBootTest = { module = "org.springframework.boot:spring-boot-test", version.ref = "springBoot" } springBootStarterActuator = { module = "org.springframework.boot:spring-boot-starter-actuator", version.ref = "springBoot" } +# Spring Boot 4.0 split TransactionAutoConfiguration into a dedicated module (was in spring-boot-autoconfigure in 3.x). +springBootTransaction = { module = "org.springframework.boot:spring-boot-transaction", version.ref = "springBoot" } assertjCore = { module = "org.assertj:assertj-core", version.ref = "assertj" } micrometerCore = { module = "io.micrometer:micrometer-core", version.ref = "micrometer" } micrometerTest = { module = "io.micrometer:micrometer-test", version.ref = "micrometer" } diff --git a/okapi-integration-tests/build.gradle.kts b/okapi-integration-tests/build.gradle.kts index 10e1a24..018fd05 100644 --- a/okapi-integration-tests/build.gradle.kts +++ b/okapi-integration-tests/build.gradle.kts @@ -44,5 +44,18 @@ dependencies { testImplementation(libs.springContext) testImplementation(libs.springTx) testImplementation(libs.springBootAutoconfigure) + testImplementation(libs.springBootTest) testImplementation(libs.springJdbc) + // Spring Boot 4.x doesn't pull AssertJ transitively but ApplicationContextRunner needs it + testImplementation(libs.assertjCore) + + // Exposed-Spring bridge (proves autoconfig works with non-DataSourceTransactionManager PTMs) + testImplementation(libs.exposedCore) + testImplementation(libs.exposedJdbc) + testImplementation(libs.exposedSpringTransaction) + + // JPA + Hibernate — proves extractDataSource() pulls JpaTransactionManager.getDataSource() + // and validatePtmDataSourceMatch fails fast on PTM↔DataSource mismatch under JPA. + testImplementation(libs.springOrm) + testImplementation(libs.hibernateCore) } diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt index 5a5b371..7bb8d18 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/support/PostgresTestSupport.kt @@ -37,10 +37,26 @@ class PostgresTestSupport { } } - private fun runLiquibase() { - val connection = DriverManager.getConnection(container.jdbcUrl, container.username, container.password) - val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection)) - Liquibase("com/softwaremill/okapi/db/postgres/changelog.xml", ClassLoaderResourceAccessor(), db).use { it.update("") } - connection.close() + private fun runLiquibase() = runOkapiLiquibaseOn(container) +} + +/** + * Applies okapi's PostgreSQL Liquibase changelog to the given container. For tests that manage + * their own PostgreSQL containers (e.g. 2-DataSource setups) and can't use the single-container + * `PostgresTestSupport` class. + */ +fun runOkapiLiquibaseOn(container: PostgreSQLContainer) { + DriverManager.getConnection(container.jdbcUrl, container.username, container.password).use { conn -> + val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(conn)) + Liquibase("com/softwaremill/okapi/db/postgres/changelog.xml", ClassLoaderResourceAccessor(), db).use { + it.update("") + } } } + +/** Builds a plain `PGSimpleDataSource` pointing at the given container. */ +fun pgDataSourceOf(container: PostgreSQLContainer): DataSource = PGSimpleDataSource().apply { + setURL(container.jdbcUrl) + user = container.username + password = container.password +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/ExposedSpringBridgeEndToEndTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/ExposedSpringBridgeEndToEndTest.kt new file mode 100644 index 0000000..aa8d48f --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/ExposedSpringBridgeEndToEndTest.kt @@ -0,0 +1,243 @@ +package com.softwaremill.okapi.test.transaction + +import com.softwaremill.okapi.core.DeliveryInfo +import com.softwaremill.okapi.core.MessageDeliverer +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.core.OutboxProcessor +import com.softwaremill.okapi.core.TransactionRunner +import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.springboot.OkapiLiquibaseAutoConfiguration +import com.softwaremill.okapi.springboot.OutboxAutoConfiguration +import com.softwaremill.okapi.springboot.SpringConnectionProvider +import com.softwaremill.okapi.springboot.SpringOutboxPublisher +import com.softwaremill.okapi.springboot.SpringTransactionRunner +import com.softwaremill.okapi.test.support.CountingDataSource +import com.softwaremill.okapi.test.support.RecordingMessageDeliverer +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import io.kotest.matchers.types.shouldBeInstanceOf +import org.jetbrains.exposed.v1.spring7.transaction.SpringTransactionManager +import org.postgresql.ds.PGSimpleDataSource +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import org.springframework.transaction.PlatformTransactionManager +import org.springframework.transaction.support.TransactionTemplate +import org.testcontainers.containers.PostgreSQLContainer +import java.time.Clock +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import javax.sql.DataSource + +/** + * Proves [OutboxAutoConfiguration]'s `okapiTransactionRunner` bean factory works with a + * non-`DataSourceTransactionManager` `PlatformTransactionManager` — specifically Exposed's + * `SpringTransactionManager` bridge. If the autoconfig assumed DST, this test would fail. + * + * Two assertions: + * 1. With processor disabled: a single Spring TX wrapping `springOutboxPublisher.publish()` + * borrows exactly one physical connection from the pool. This proves the autoconfig-built + * runner correctly bridges to `SpringConnectionProvider` through Spring's `ConnectionHolder`. + * 2. With processor enabled (200ms tick): an entry published inside a Spring TX is later + * delivered by the background scheduler — proving each scheduler tick is itself bracketed + * by a Spring TX driven by the bridged PTM. + * + * Liquibase schema migration is handled by `okapi-spring-boot` autoconfiguration; no manual + * setup needed. + */ +class ExposedSpringBridgeEndToEndTest : FunSpec({ + + val container = PostgreSQLContainer("postgres:16") + lateinit var counter: CountingDataSource + + beforeSpec { + container.start() + val raw = PGSimpleDataSource().apply { + setURL(container.jdbcUrl) + user = container.username + password = container.password + } + counter = CountingDataSource(raw) + } + + afterSpec { container.stop() } + + // Both okapi-postgres and okapi-mysql are on the test classpath (shared integration-tests + // module). Explicitly register PostgresOutboxStore so the autoconfig's MySQL path is + // unambiguously skipped — otherwise MySQL's `FOR UPDATE SKIP LOCKED` with `FORCE INDEX` + // hint would fail on Postgres. + fun runner(recorder: RecordingMessageDeliverer): ApplicationContextRunner = ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java, OkapiLiquibaseAutoConfiguration::class.java)) + .withBean(DataSource::class.java, { counter as DataSource }) + .withBean(MessageDeliverer::class.java, { recorder }) + .withBean(PlatformTransactionManager::class.java, { SpringTransactionManager(counter) }) + .withBean(PostgresOutboxStore::class.java, { + PostgresOutboxStore(SpringConnectionProvider(counter), Clock.systemUTC()) + }) + + test("publish inside Spring TX driven by Exposed-bridge PTM uses a single physical connection") { + // Disable processor only — purger is left at its default 1h interval so it never ticks + // during this test, but its enabled=true keeps `okapiTransactionRunner` factory active + // (the factory is gated on at least one scheduler being enabled). + runner(RecordingMessageDeliverer()) + .withPropertyValues("okapi.processor.enabled=false") + .run { ctx -> + ctx.getBean(TransactionRunner::class.java).shouldBeInstanceOf() + + resetCounterAndTruncate(counter) + + val publisher = ctx.getBean(SpringOutboxPublisher::class.java) + val tm = ctx.getBean(PlatformTransactionManager::class.java) + + TransactionTemplate(tm).execute { + publisher.publish( + OutboxMessage("order.created", """{"orderId":"abc-123"}"""), + RecordingDeliveryInfo, + ) + } + + counter.opened.get() shouldBe counter.closed.get() + counter.opened.get() shouldBe 1 + } + } + + test("processor tick under Exposed-bridge PTM claims and delivers a published entry") { + val recorder = RecordingMessageDeliverer() + runner(recorder) + .withPropertyValues("okapi.processor.interval=200ms") + .run { ctx -> + resetCounterAndTruncate(counter) + + val publisher = ctx.getBean(SpringOutboxPublisher::class.java) + val tm = ctx.getBean(PlatformTransactionManager::class.java) + + TransactionTemplate(tm).execute { + publisher.publish( + OutboxMessage("order.created", """{"orderId":"xyz-789"}"""), + RecordingDeliveryInfo, + ) + } + + val deadline = System.currentTimeMillis() + 5_000 + while (recorder.deliveryCount() == 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(50) + } + recorder.deliveryCount() shouldBe 1 + } + } + + // The single-process happy-path tests above would silently pass even if the autoconfig had + // re-introduced the auto-commit fallback (the bug KOJAK-67 fixes). This test exercises + // contention: 5 concurrent processor invocations against 50 published entries, each tick + // bracketed by the autoconfig-built TransactionRunner. With proper TX bracketing the + // FOR UPDATE SKIP LOCKED rows stay locked across claim+update — no amplification. With a + // no-op TR (or auto-commit fallback) the lock is released between claim and update, + // multiple processors deliver the same entry, and `assertNoAmplification` throws. + test("autoconfig-built TransactionRunner prevents delivery amplification under concurrent processor invocations") { + val recorder = RecordingMessageDeliverer() + runner(recorder) + // Disable processor only — purger stays at its default 1h interval (won't fire in test) + // but keeps `okapiTransactionRunner` factory active. + .withPropertyValues("okapi.processor.enabled=false") + .run { ctx -> + resetCounterAndTruncate(counter) + + val publisher = ctx.getBean(SpringOutboxPublisher::class.java) + val tm = ctx.getBean(PlatformTransactionManager::class.java) + val processor = ctx.getBean(OutboxProcessor::class.java) + val transactionRunner = ctx.getBean(TransactionRunner::class.java) + + val entryCount = 50 + val processorCount = 5 + + repeat(entryCount) { i -> + TransactionTemplate(tm).execute { + publisher.publish( + OutboxMessage("test.event", """{"i":$i}"""), + RecordingDeliveryInfo, + ) + } + } + + val barrier = CyclicBarrier(processorCount) + val executor = Executors.newVirtualThreadPerTaskExecutor() + val futures = (1..processorCount).map { + CompletableFuture.supplyAsync( + { + barrier.await(10, TimeUnit.SECONDS) + transactionRunner.runInTransaction { processor.processNext(entryCount) } + }, + executor, + ) + } + CompletableFuture.allOf(*futures.toTypedArray()).get(30, TimeUnit.SECONDS) + executor.shutdown() + + recorder.assertNoAmplification() + recorder.deliveryCount() shouldBe entryCount + } + } + + // Purger uses a different code path than the processor — native SQL delete with limit, no + // claim/update state machine. Under the Exposed `SpringTransactionManager` bridge this needs + // its own E2E coverage: a regression where the bridge mishandles bracketing of the bulk delete + // (e.g. an Exposed upgrade that changes statement execution) would silently leave DELIVERED + // rows accumulating without breaking any other test. + test("purger tick under Exposed-bridge PTM removes DELIVERED entries past retention") { + val recorder = RecordingMessageDeliverer() + runner(recorder) + .withPropertyValues( + "okapi.processor.interval=100ms", + "okapi.purger.interval=200ms", + "okapi.purger.retention=1ms", + ) + .run { ctx -> + resetCounterAndTruncate(counter) + + val publisher = ctx.getBean(SpringOutboxPublisher::class.java) + val tm = ctx.getBean(PlatformTransactionManager::class.java) + + TransactionTemplate(tm).execute { + publisher.publish( + OutboxMessage("test.purger", """{"k":"v"}"""), + RecordingDeliveryInfo, + ) + } + + val deliveredDeadline = System.currentTimeMillis() + 5_000 + while (recorder.deliveryCount() == 0 && System.currentTimeMillis() < deliveredDeadline) { + Thread.sleep(50) + } + recorder.deliveryCount() shouldBe 1 + + val purgedDeadline = System.currentTimeMillis() + 5_000 + while (rowCount(counter) > 0 && System.currentTimeMillis() < purgedDeadline) { + Thread.sleep(100) + } + rowCount(counter) shouldBe 0 + } + } +}) + +private fun rowCount(counter: CountingDataSource): Int = counter.delegate.connection.use { c -> + c.createStatement().use { stmt -> + stmt.executeQuery("SELECT COUNT(*) FROM okapi_outbox").use { rs -> + rs.next() + rs.getInt(1) + } + } +} + +private fun resetCounterAndTruncate(counter: CountingDataSource) { + counter.delegate.connection.use { c -> + c.createStatement().use { it.execute("TRUNCATE TABLE okapi_outbox") } + } + counter.opened.set(0) + counter.closed.set(0) +} + +private object RecordingDeliveryInfo : DeliveryInfo { + override val type: String = "recording" + override fun serialize(): String = """{"type":"recording"}""" +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/JpaTransactionManagerFailFastTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/JpaTransactionManagerFailFastTest.kt new file mode 100644 index 0000000..44ffc4f --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/JpaTransactionManagerFailFastTest.kt @@ -0,0 +1,85 @@ +package com.softwaremill.okapi.test.transaction + +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.MessageDeliverer +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.springboot.OkapiLiquibaseAutoConfiguration +import com.softwaremill.okapi.springboot.OutboxAutoConfiguration +import com.softwaremill.okapi.springboot.SpringConnectionProvider +import com.softwaremill.okapi.test.support.pgDataSourceOf +import com.softwaremill.okapi.test.support.runOkapiLiquibaseOn +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.string.shouldContain +import org.springframework.beans.factory.config.BeanDefinitionCustomizer +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import org.springframework.orm.jpa.JpaTransactionManager +import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean +import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter +import org.springframework.transaction.PlatformTransactionManager +import org.testcontainers.containers.PostgreSQLContainer +import javax.sql.DataSource + +/** + * Proves the JPA branch of `extractDataSource` (`OutboxAutoConfiguration.kt`): a + * `JpaTransactionManager` whose auto-detected DataSource differs from okapi's outbox DataSource + * triggers `validatePtmDataSourceMatch` fail-fast at startup. Companion to + * `WrongPtmDataSourceAmplificationProofTest`, which documents the remaining residual risk for + * PTMs that expose no DataSource at all (Exposed bridge, JTA). + */ +class JpaTransactionManagerFailFastTest : FunSpec({ + + val container = PostgreSQLContainer("postgres:16") + lateinit var dsA: DataSource + lateinit var dsB: DataSource + + beforeSpec { + container.start() + dsA = pgDataSourceOf(container) + dsB = pgDataSourceOf(container) + runOkapiLiquibaseOn(container) + } + + afterSpec { container.stop() } + + test("JpaTransactionManager bound to a different DataSource than the outbox DS fails fast at startup") { + val emf = LocalContainerEntityManagerFactoryBean().apply { + dataSource = dsA + jpaVendorAdapter = HibernateJpaVendorAdapter() + // Empty package scan creates an implicit persistence unit without requiring persistence.xml + // — we have no @Entity classes; the EMF only needs to exist so JpaTransactionManager can + // auto-detect its DataSource. + setPackagesToScan() + afterPropertiesSet() + }.`object`.shouldNotBeNull() + + ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of( + OutboxAutoConfiguration::class.java, + OkapiLiquibaseAutoConfiguration::class.java, + ), + ) + .withBean("dsB", DataSource::class.java, { dsB }, BeanDefinitionCustomizer { it.isPrimary = true }) + .withBean("dsA", DataSource::class.java, { dsA }) + .withBean("jpaTmA", PlatformTransactionManager::class.java, { JpaTransactionManager(emf) }) + .withBean(MessageDeliverer::class.java, { JpaTestStubDeliverer }) + .withBean(PostgresOutboxStore::class.java, { + PostgresOutboxStore(SpringConnectionProvider(dsB), java.time.Clock.systemUTC()) + }) + .withPropertyValues("okapi.liquibase.enabled=false") + .run { ctx -> + val failure = ctx.startupFailure + failure.shouldNotBeNull() + failure.stackTraceToString() shouldContain + "is bound to a different DataSource than okapi's outbox DataSource" + } + } +}) + +private object JpaTestStubDeliverer : MessageDeliverer { + override val type = "stub" + override fun deliver(entry: OutboxEntry) = DeliveryResult.Success +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/JpaTransactionManagerMatchedDataSourceTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/JpaTransactionManagerMatchedDataSourceTest.kt new file mode 100644 index 0000000..f9d7668 --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/JpaTransactionManagerMatchedDataSourceTest.kt @@ -0,0 +1,88 @@ +package com.softwaremill.okapi.test.transaction + +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.MessageDeliverer +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.TransactionRunner +import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.springboot.OkapiLiquibaseAutoConfiguration +import com.softwaremill.okapi.springboot.OutboxAutoConfiguration +import com.softwaremill.okapi.springboot.SpringConnectionProvider +import com.softwaremill.okapi.springboot.SpringTransactionRunner +import com.softwaremill.okapi.test.support.pgDataSourceOf +import com.softwaremill.okapi.test.support.runOkapiLiquibaseOn +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.matchers.types.shouldBeSameInstanceAs +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import org.springframework.orm.jpa.JpaTransactionManager +import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean +import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter +import org.springframework.transaction.PlatformTransactionManager +import org.testcontainers.containers.PostgreSQLContainer +import javax.sql.DataSource + +/** + * Companion to `JpaTransactionManagerFailFastTest`: proves the MATCH branch of the JPA path. + * When the `JpaTransactionManager`'s auto-detected DataSource equals okapi's outbox DataSource, + * `validatePtmDataSourceMatch` succeeds, the context starts, and `okapiTransactionRunner` is wired + * to a `TransactionTemplate` bound to that PTM. + * + * Without this test the fail-fast version above could theoretically pass even if a regression + * routed JPA through a wrong branch and made every JPA setup fail — match coverage is the + * other half of the proof. + */ +class JpaTransactionManagerMatchedDataSourceTest : FunSpec({ + + val container = PostgreSQLContainer("postgres:16") + lateinit var ds: DataSource + + beforeSpec { + container.start() + ds = pgDataSourceOf(container) + runOkapiLiquibaseOn(container) + } + + afterSpec { container.stop() } + + test("JpaTransactionManager bound to the SAME DataSource as the outbox: context starts cleanly") { + val emf = LocalContainerEntityManagerFactoryBean().apply { + dataSource = ds + jpaVendorAdapter = HibernateJpaVendorAdapter() + setPackagesToScan() + afterPropertiesSet() + }.`object`.shouldNotBeNull() + + val jpaPtm = JpaTransactionManager(emf) + + ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of( + OutboxAutoConfiguration::class.java, + OkapiLiquibaseAutoConfiguration::class.java, + ), + ) + .withBean(DataSource::class.java, { ds }) + .withBean("jpaTm", PlatformTransactionManager::class.java, { jpaPtm }) + .withBean(MessageDeliverer::class.java, { JpaMatchStubDeliverer }) + .withBean(PostgresOutboxStore::class.java, { + PostgresOutboxStore(SpringConnectionProvider(ds), java.time.Clock.systemUTC()) + }) + .withPropertyValues("okapi.liquibase.enabled=false") + .run { ctx -> + ctx.startupFailure.shouldBeNull() + val runner = ctx.getBean(TransactionRunner::class.java).shouldBeInstanceOf() + // The runner's TransactionTemplate must point at OUR JPA PTM, proving extractDataSource + // returned the matched DS and validatePtmDataSourceMatch's return-after-match path was taken. + runner.transactionTemplate.transactionManager shouldBeSameInstanceAs jpaPtm + } + } +}) + +private object JpaMatchStubDeliverer : MessageDeliverer { + override val type = "stub" + override fun deliver(entry: OutboxEntry) = DeliveryResult.Success +} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt index bf6396e..c96e3a6 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/MultiDataSourceTransactionTest.kt @@ -7,19 +7,15 @@ import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.postgres.PostgresOutboxStore import com.softwaremill.okapi.springboot.SpringConnectionProvider import com.softwaremill.okapi.springboot.SpringOutboxPublisher +import com.softwaremill.okapi.test.support.pgDataSourceOf +import com.softwaremill.okapi.test.support.runOkapiLiquibaseOn import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.maps.shouldContain import io.kotest.matchers.shouldNotBe -import liquibase.Liquibase -import liquibase.database.DatabaseFactory -import liquibase.database.jvm.JdbcConnection -import liquibase.resource.ClassLoaderResourceAccessor -import org.postgresql.ds.PGSimpleDataSource import org.springframework.jdbc.datasource.DataSourceTransactionManager import org.springframework.transaction.support.TransactionTemplate import org.testcontainers.containers.PostgreSQLContainer -import java.sql.DriverManager import java.time.Clock import javax.sql.DataSource @@ -58,20 +54,11 @@ class MultiDataSourceTransactionTest : FunSpec({ outboxContainer.start() otherContainer.start() - outboxDataSource = PGSimpleDataSource().apply { - setURL(outboxContainer.jdbcUrl) - user = outboxContainer.username - password = outboxContainer.password - } - - otherDataSource = PGSimpleDataSource().apply { - setURL(otherContainer.jdbcUrl) - user = otherContainer.username - password = otherContainer.password - } + outboxDataSource = pgDataSourceOf(outboxContainer) + otherDataSource = pgDataSourceOf(otherContainer) // Run Liquibase migration only on the outbox database - runLiquibase(outboxContainer) + runOkapiLiquibaseOn(outboxContainer) val outboxTxManager = DataSourceTransactionManager(outboxDataSource) outboxTxTemplate = TransactionTemplate(outboxTxManager) @@ -135,10 +122,3 @@ class MultiDataSourceTransactionTest : FunSpec({ counts shouldContain (OutboxStatus.PENDING to 1L) } }) - -private fun runLiquibase(container: PostgreSQLContainer) { - val connection = DriverManager.getConnection(container.jdbcUrl, container.username, container.password) - val db = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(JdbcConnection(connection)) - Liquibase("com/softwaremill/okapi/db/postgres/changelog.xml", ClassLoaderResourceAccessor(), db).use { it.update("") } - connection.close() -} diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/WrongPtmDataSourceAmplificationProofTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/WrongPtmDataSourceAmplificationProofTest.kt new file mode 100644 index 0000000..9d330bf --- /dev/null +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transaction/WrongPtmDataSourceAmplificationProofTest.kt @@ -0,0 +1,131 @@ +package com.softwaremill.okapi.test.transaction + +import com.softwaremill.okapi.core.DeliveryInfo +import com.softwaremill.okapi.core.MessageDeliverer +import com.softwaremill.okapi.core.OutboxMessage +import com.softwaremill.okapi.core.OutboxProcessor +import com.softwaremill.okapi.core.TransactionRunner +import com.softwaremill.okapi.postgres.PostgresOutboxStore +import com.softwaremill.okapi.springboot.OkapiLiquibaseAutoConfiguration +import com.softwaremill.okapi.springboot.OutboxAutoConfiguration +import com.softwaremill.okapi.springboot.SpringConnectionProvider +import com.softwaremill.okapi.springboot.SpringOutboxPublisher +import com.softwaremill.okapi.test.support.RecordingMessageDeliverer +import com.softwaremill.okapi.test.support.pgDataSourceOf +import com.softwaremill.okapi.test.support.runOkapiLiquibaseOn +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.maps.shouldNotBeEmpty +import io.kotest.matchers.shouldBe +import org.jetbrains.exposed.v1.spring7.transaction.SpringTransactionManager +import org.springframework.beans.factory.config.BeanDefinitionCustomizer +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import org.springframework.jdbc.datasource.DataSourceTransactionManager +import org.springframework.transaction.PlatformTransactionManager +import org.springframework.transaction.support.TransactionTemplate +import org.testcontainers.containers.PostgreSQLContainer +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import javax.sql.DataSource + +/** + * Documents the residual silent-failure risk for non-extractable PTMs (JTA, Exposed bridge, + * any `PlatformTransactionManager` that exposes neither a `DataSource` resourceFactory nor a + * public `getDataSource()`) when the user wires the outbox to a different DataSource than the + * PTM and does NOT set `okapi.transaction-manager-qualifier`. + * + * `validatePtmDataSourceMatch` can fail-fast for `DataSourceTransactionManager`, + * `JpaTransactionManager`, and `HibernateTransactionManager` (see `extractDataSource`). For the + * remaining PTM families it can only emit a WARN — Spring exposes no public API to derive the + * bound DataSource. This test pins down what "WARN only" empirically means: concurrent processors + * see fully unlocked rows and amplify delivery. + * + * Asserts amplification DID happen (50/50 entries delivered more than once is the typical run). + * If a future change adds extraction support for the Exposed bridge — or pessimistic locking + * starts holding across the spurious auto-commit — this test will fail and force a re-evaluation. + */ +class WrongPtmDataSourceAmplificationProofTest : FunSpec({ + + val dsAContainer = PostgreSQLContainer("postgres:16") + val dsBContainer = PostgreSQLContainer("postgres:16") + + lateinit var dsA: DataSource + lateinit var dsB: DataSource + + beforeSpec { + dsAContainer.start() + dsBContainer.start() + dsA = pgDataSourceOf(dsAContainer) + dsB = pgDataSourceOf(dsBContainer) + // Migrate both: DS-B holds the outbox table the processor reads; DS-A would have it if + // okapi had picked it. Keeping parity rules out "table missing" as a cause of zero deliveries. + runOkapiLiquibaseOn(dsAContainer) + runOkapiLiquibaseOn(dsBContainer) + } + + afterSpec { + dsAContainer.stop() + dsBContainer.stop() + } + + test("non-extractable PTM bound to wrong DataSource permits delivery amplification") { + val recorder = RecordingMessageDeliverer() + + // Publish 50 entries to DS-B via a CORRECTLY-wired DST(DS-B) publisher. + val publishTpl = TransactionTemplate(DataSourceTransactionManager(dsB)) + val publishStore = PostgresOutboxStore(SpringConnectionProvider(dsB), java.time.Clock.systemUTC()) + val publisher = SpringOutboxPublisher( + delegate = com.softwaremill.okapi.core.OutboxPublisher(publishStore, java.time.Clock.systemUTC()), + dataSource = dsB, + ) + repeat(50) { i -> + publishTpl.execute { + publisher.publish(OutboxMessage("test.event", """{"i":$i}"""), AmplificationDeliveryInfo) + } + } + + ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java, OkapiLiquibaseAutoConfiguration::class.java)) + // DS-B as @Primary so resolveDataSource() picks it as the outbox DS. + .withBean("dsB", DataSource::class.java, { dsB }, BeanDefinitionCustomizer { it.isPrimary = true }) + .withBean("dsA", DataSource::class.java, { dsA }) + // Exposed SpringTransactionManager bound to DS-A — non-extractable: validatePtmDataSourceMatch + // logs a WARN and proceeds (the silent-failure setup this test documents). + .withBean("exposedTmA", PlatformTransactionManager::class.java, { SpringTransactionManager(dsA) }) + .withBean(MessageDeliverer::class.java, { recorder }) + .withBean(PostgresOutboxStore::class.java, { + PostgresOutboxStore(SpringConnectionProvider(dsB), java.time.Clock.systemUTC()) + }) + .withPropertyValues("okapi.processor.enabled=false", "okapi.liquibase.enabled=false") + .run { ctx -> + val processor = ctx.getBean(OutboxProcessor::class.java) + val transactionRunner = ctx.getBean(TransactionRunner::class.java) + + val barrier = CyclicBarrier(5) + val executor = Executors.newVirtualThreadPerTaskExecutor() + val futures = (1..5).map { + CompletableFuture.supplyAsync( + { + barrier.await(10, TimeUnit.SECONDS) + transactionRunner.runInTransaction { processor.processNext(50) } + }, + executor, + ) + } + CompletableFuture.allOf(*futures.toTypedArray()).get(60, TimeUnit.SECONDS) + executor.shutdown() + + recorder.deliveryCount() shouldBe 50 + // The residual risk: with FOR UPDATE SKIP LOCKED collapsed to auto-commit, concurrent + // processors see overlapping result sets and at least one entry is delivered twice. + recorder.deliveries.filter { it.value.size > 1 }.shouldNotBeEmpty() + } + } +}) + +private object AmplificationDeliveryInfo : DeliveryInfo { + override val type: String = "recording" + override fun serialize(): String = """{"type":"recording"}""" +} diff --git a/okapi-spring-boot/build.gradle.kts b/okapi-spring-boot/build.gradle.kts index 9033019..926f702 100644 --- a/okapi-spring-boot/build.gradle.kts +++ b/okapi-spring-boot/build.gradle.kts @@ -1,10 +1,47 @@ plugins { id("buildsrc.convention.kotlin-jvm") id("buildsrc.convention.publish") + // Mutation testing — opt-in only: ./gradlew :okapi-spring-boot:pitest -PenableMutationTesting=true + id("info.solidsoft.pitest") version "1.19.0" apply false } description = "Spring Boot autoconfiguration for Okapi" +if (providers.gradleProperty("enableMutationTesting").orNull?.toBoolean() == true) { + apply(plugin = "info.solidsoft.pitest") + configure { + pitestVersion.set("1.17.0") + junit5PluginVersion.set("1.2.1") + targetClasses.set( + listOf( + "com.softwaremill.okapi.springboot.OutboxAutoConfiguration*", + "com.softwaremill.okapi.springboot.OutboxProcessorScheduler*", + "com.softwaremill.okapi.springboot.OutboxPurgerScheduler*", + "com.softwaremill.okapi.springboot.OkapiProperties*", + "com.softwaremill.okapi.springboot.SpringTransactionRunner*", + ), + ) + targetTests.set(listOf("com.softwaremill.okapi.springboot.*")) + excludedTestClasses.set( + listOf( + // Postgres testcontainer-based tests are too heavy per-mutation + "com.softwaremill.okapi.springboot.OutboxMysqlEndToEndTest", + ), + ) + threads.set(4) + outputFormats.set(listOf("HTML", "XML")) + timestampedReports.set(false) + } +} + +// spring-boot-transaction is a Spring Boot 4.0+ artifact (3.x bundles TransactionAutoConfiguration +// in spring-boot-autoconfigure). The CI matrix override -PspringBootVersion=3.5.x rewrites every +// org.springframework.boot:* coordinate, so unconditionally declaring spring-boot-transaction makes +// it try to resolve a non-existent spring-boot-transaction:3.5.x. Gate on the resolved major. +val springBootMajorForTests = ( + providers.gradleProperty("springBootVersion").orNull ?: libs.versions.springBoot.get() + ).substringBefore('.').toInt() + dependencies { implementation(project(":okapi-core")) @@ -45,8 +82,16 @@ dependencies { testImplementation(libs.micrometerCore) // Brings in the metrics auto-config jar so @AutoConfigureAfter targets are resolvable in tests. testImplementation(libs.springBootStarterActuator) + // TransactionAutoConfiguration: in Spring Boot 4.0+ it lives in its own spring-boot-transaction + // module; in 3.x it ships inside spring-boot-autoconfigure (already on the test classpath). + // TransactionTemplateHijackProofTest resolves whichever FQCN is present, so we only need the + // extra dependency on 4.x. + if (springBootMajorForTests >= 4) { + testImplementation(libs.springBootTransaction) + } // Logback's ListAppender is used to capture and assert WARN-level log output (e.g. the - // LiquibaseDisabledNotice breadcrumb) — slf4j-simple does not provide an introspectable appender. + // LiquibaseDisabledNotice breadcrumb + our PTM↔DS validation cannot-verify WARN) — slf4j-simple + // does not provide an introspectable appender. testImplementation(libs.logbackClassic) } diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiProperties.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiProperties.kt index d67d66c..2df8d16 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiProperties.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OkapiProperties.kt @@ -7,12 +7,17 @@ import org.springframework.validation.annotation.Validated @Validated data class OkapiProperties( val datasourceQualifier: String? = null, + val transactionManagerQualifier: String? = null, val liquibase: Liquibase = Liquibase(), ) { init { require(datasourceQualifier == null || datasourceQualifier.isNotBlank()) { "okapi.datasource-qualifier must not be blank. Set it to the bean name of the outbox DataSource, or remove the property." } + require(transactionManagerQualifier == null || transactionManagerQualifier.isNotBlank()) { + "okapi.transaction-manager-qualifier must not be blank. Set it to the bean name of the outbox " + + "PlatformTransactionManager, or remove the property." + } } /** diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt index 2eb6940..d801cc4 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfiguration.kt @@ -10,17 +10,25 @@ import com.softwaremill.okapi.core.OutboxPurgerConfig import com.softwaremill.okapi.core.OutboxSchedulerConfig import com.softwaremill.okapi.core.OutboxStore import com.softwaremill.okapi.core.RetryPolicy +import com.softwaremill.okapi.core.TransactionRunner import com.softwaremill.okapi.mysql.MysqlOutboxStore import com.softwaremill.okapi.postgres.PostgresOutboxStore +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.BeanFactory +import org.springframework.beans.factory.BeanNotOfRequiredTypeException +import org.springframework.beans.factory.NoSuchBeanDefinitionException import org.springframework.beans.factory.ObjectProvider import org.springframework.boot.autoconfigure.AutoConfiguration import org.springframework.boot.autoconfigure.condition.ConditionalOnClass +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.jdbc.datasource.DelegatingDataSource import org.springframework.transaction.PlatformTransactionManager +import org.springframework.transaction.support.ResourceTransactionManager import org.springframework.transaction.support.TransactionTemplate import java.time.Clock import javax.sql.DataSource @@ -28,6 +36,8 @@ import javax.sql.DataSource /** * Spring Boot autoconfiguration for the outbox processing pipeline. * + * Requires a [TransactionRunner] bean, or a [PlatformTransactionManager] from which one can be derived. + * * Required beans (must be provided by the application): * - One or more [MessageDeliverer] beans — transport implementations * (e.g. HttpMessageDeliverer, KafkaMessageDeliverer). @@ -40,14 +50,15 @@ import javax.sql.DataSource * If both are present, Postgres takes priority. Override by defining your own `@Bean OutboxStore`. * - [Clock] — defaults to [Clock.systemUTC] * - [RetryPolicy] — defaults to `maxRetries = 5` - * - [PlatformTransactionManager] — when present, scheduler/purger wrap each tick in a Spring - * transaction. When absent, store calls run in JDBC auto-commit mode, which narrows - * `FOR UPDATE SKIP LOCKED` to the claim itself and allows duplicate delivery across - * processor instances; configure one for any multi-instance deployment. * - * Multi-datasource support: + * Multi-datasource / multi-PlatformTransactionManager support: * - Set `okapi.datasource-qualifier` to the bean name of the [DataSource] that holds the outbox table. * When not set, the primary (or single) DataSource is used. + * - Set `okapi.transaction-manager-qualifier` to the bean name of the + * [PlatformTransactionManager] that brackets the outbox DataSource. When not set, the unique + * (or `@Primary`) PTM is used. With multi-DS setups always set this explicitly — silent + * PTM↔DataSource mismatch otherwise reduces `FOR UPDATE SKIP LOCKED` to JDBC auto-commit and + * permits duplicate delivery across processor instances. * * Liquibase support is provided by [OkapiLiquibaseAutoConfiguration], which is ordered after this * auto-config so that its `@ConditionalOnBean(OutboxStore::class)` gates can observe which @@ -76,6 +87,46 @@ class OutboxAutoConfiguration( fun springOutboxPublisher(outboxPublisher: OutboxPublisher): SpringOutboxPublisher = SpringOutboxPublisher(delegate = outboxPublisher, dataSource = resolveDataSource()) + // Created only when at least one scheduler is enabled — TransactionRunner has no other consumer. + // Skipping the bean in publish-only deployments (both schedulers disabled) lets users run without + // a PlatformTransactionManager on the classpath at all (e.g. message producer that delegates + // outbox processing to a separate worker). + // + // ObjectProvider design: a TT in the context can come from two sources — + // a user-defined @Bean (with custom timeout/propagation/isolation), OR Spring Boot's + // TransactionAutoConfiguration which auto-registers a TT whenever a single PTM exists. The two + // are indistinguishable at this layer. We accept BOTH transparently but always extract the bound + // PTM and run validatePtmDataSourceMatch on it — so the user's TX semantics are honoured AND the + // multi-DS safety net stays armed. See TransactionTemplateHijackProofTest for the empirical proof + // that without `validatePtmDataSourceMatch` here, Boot's auto-TT silently bypasses safety. + @Bean + @ConditionalOnMissingBean + @ConditionalOnExpression("\${okapi.processor.enabled:true} or \${okapi.purger.enabled:true}") + fun okapiTransactionRunner( + transactionManager: ObjectProvider, + transactionTemplate: ObjectProvider, + beanFactory: BeanFactory, + ): TransactionRunner { + val anyTemplate = transactionTemplate.getIfUnique() + // Extract the PTM from the TT if available, else resolve through the provider. TT's + // transactionManager is nullable in the API (a TT can be constructed without one) — fall + // back to the provider path in that pathological case so we still get a validated PTM. + val ptm = anyTemplate?.transactionManager + ?: resolvePlatformTransactionManager(transactionManager, beanFactory, okapiProperties) + validatePtmDataSourceMatch(ptm, resolveDataSource(), okapiProperties) + // Use the user-/Boot-supplied TT verbatim when present (preserves timeout, propagation, + // isolation set by whoever defined it). Otherwise build a minimal TT around the validated + // PTM. Sets the *initial* TX read-only flag to false. NOTE: with PROPAGATION_REQUIRED + // (the default), a tick that joins an outer @Transactional(readOnly = true) inherits the + // outer's flag and this setting is silently ignored. The scheduler runs on a daemon + // thread with no outer TX, so this flag actually takes effect — but invocations from + // inside an existing read-only TX would still hit FOR UPDATE failures. Keep scheduler + // invocations outside @Transactional scopes. + return SpringTransactionRunner( + anyTemplate ?: TransactionTemplate(ptm).apply { isReadOnly = false }, + ) + } + @Bean @ConditionalOnMissingBean fun outboxEntryProcessor( @@ -113,11 +164,11 @@ class OutboxAutoConfiguration( fun outboxProcessorScheduler( props: OutboxProcessorProperties, outboxProcessor: OutboxProcessor, - transactionManager: ObjectProvider, + transactionRunner: TransactionRunner, ): OutboxProcessorScheduler { return OutboxProcessorScheduler( outboxProcessor = outboxProcessor, - transactionTemplate = transactionManager.getIfAvailable()?.let { TransactionTemplate(it) }, + transactionRunner = transactionRunner, config = OutboxSchedulerConfig( interval = props.interval, batchSize = props.batchSize, @@ -131,12 +182,12 @@ class OutboxAutoConfiguration( fun outboxPurgerScheduler( props: OutboxPurgerProperties, outboxStore: OutboxStore, - transactionManager: ObjectProvider, + transactionRunner: TransactionRunner, clock: ObjectProvider, ): OutboxPurgerScheduler { return OutboxPurgerScheduler( outboxStore = outboxStore, - transactionTemplate = transactionManager.getIfAvailable()?.let { TransactionTemplate(it) }, + transactionRunner = transactionRunner, config = OutboxPurgerConfig( retention = props.retention, interval = props.interval, @@ -182,6 +233,8 @@ class OutboxAutoConfiguration( } companion object { + private val logger = LoggerFactory.getLogger(OutboxAutoConfiguration::class.java) + internal fun resolveDataSource( dataSources: Map, primaryDataSource: DataSource, @@ -195,5 +248,207 @@ class OutboxAutoConfiguration( "Available: ${dataSources.keys}", ) } + + internal fun resolvePlatformTransactionManager( + provider: ObjectProvider, + beanFactory: BeanFactory, + properties: OkapiProperties, + ): PlatformTransactionManager { + val qualifier = properties.transactionManagerQualifier + if (qualifier != null) { + return try { + beanFactory.getBean(qualifier, PlatformTransactionManager::class.java) + } catch (e: NoSuchBeanDefinitionException) { + throw NoSuchBeanDefinitionException( + qualifier, + "okapi.transaction-manager-qualifier='$qualifier' — no PlatformTransactionManager bean named " + + "'$qualifier' found. Check the bean name or remove the property to fall back to " + + "auto-resolution.", + ).apply { initCause(e) } + } catch (e: BeanNotOfRequiredTypeException) { + // Common typo: qualifier points to e.g. a DataSource bean name instead of a PTM + // bean name. Spring's default message ("Bean named 'X' is expected to be of type ... + // but was actually of type ...") doesn't mention okapi, so users searching for + // "okapi" in startup logs find nothing. Rewrap with okapi-specific context. + throw IllegalStateException( + "okapi.transaction-manager-qualifier='$qualifier' — bean named '$qualifier' exists " + + "but is of type '${e.actualType.name}', not a PlatformTransactionManager. Check " + + "the property value (likely a typo into a DataSource or other bean name) or " + + "remove it to fall back to auto-resolution.", + e, + ) + } + } + val unique = provider.getIfUnique() + if (unique != null) return unique + val available = provider.stream().toList() + throw NoSuchBeanDefinitionException( + TransactionRunner::class.java, + if (available.isEmpty()) { + "okapi-spring-boot requires a TransactionRunner bean to bracket each scheduler tick in a " + + "transaction. Configure spring-boot-starter-jdbc or spring-boot-starter-data-jpa (which " + + "provide a PlatformTransactionManager that okapi adapts automatically), or define your own " + + "@Bean TransactionRunner." + } else { + "Multiple PlatformTransactionManager beans found (${available.size}), none marked as @Primary. " + + "Mark the outbox PTM as @Primary, set okapi.transaction-manager-qualifier to disambiguate, " + + "or define an explicit @Bean TransactionRunner." + }, + ) + } + + internal fun validatePtmDataSourceMatch( + ptm: PlatformTransactionManager, + outboxDataSource: DataSource, + properties: OkapiProperties, + ) { + // Tries three extraction strategies in order: + // 1. ResourceTransactionManager whose resourceFactory is a JDBC DataSource (DST + custom) + // 2. JpaTransactionManager.getDataSource() — public getter, autodetected from EntityManagerFactory + // 3. HibernateTransactionManager.getDataSource() — same shape (both pre-/post-6.2 packages) + // Non-extractable cases (JTA, Exposed SpringTransactionManager, JPA/Hibernate with no JDBC DS) + // fall through to the WARN/INFO path. The WrongPtmDataSourceAmplificationProofTest empirically + // shows that wrong-DS bracketing collapses FOR UPDATE SKIP LOCKED to JDBC auto-commit and + // permits 100% delivery amplification — fail-fast when we can verify, log loudly when we cannot. + val ptmDataSource = extractDataSource(ptm) + if (ptmDataSource != null) { + // Spring's recommended pattern wraps the outbox DataSource bean in TransactionAwareDataSourceProxy + // (or LazyConnectionDataSourceProxy) for use by query helpers, while passing the raw DataSource + // to the PTM (Spring docs explicitly say "TransactionAwareDataSourceProxy should NOT be passed + // to a PTM"). Reference equality on those references would falsely fail. Unwrap the + // DelegatingDataSource chain on both sides before comparison. + val unwrappedPtm = unwrapDataSource(ptmDataSource) + val unwrappedOutbox = unwrapDataSource(outboxDataSource) + if (unwrappedPtm !== unwrappedOutbox) { + // If either side is still a DelegatingDataSource after unwrap, the chain terminated + // early — either a cycle (`setTargetDataSource(self)`) or a not-yet-initialised + // `LazyConnectionDataSourceProxy.targetDataSource == null`. Surface that as a distinct + // WARN so the operator looks at the proxy wiring instead of chasing the PTM↔DS error. + if (unwrappedPtm is DelegatingDataSource || unwrappedOutbox is DelegatingDataSource) { + logger.warn( + "Could not fully unwrap one or both DataSource sides — at least one " + + "DelegatingDataSource chain terminated early (cycle, or " + + "LazyConnectionDataSourceProxy with targetDataSource not yet set). " + + "PTM side: {} (stopped at {}). Outbox side: {} (stopped at {}). If the " + + "two are intended to wrap the same DataSource, fix the proxy chain " + + "before relying on the PTM↔DataSource mismatch error below.", + ptmDataSource, + unwrappedPtm, + outboxDataSource, + unwrappedOutbox, + ) + } + error( + "PlatformTransactionManager '${ptm.javaClass.name}' is bound to a different DataSource than " + + "okapi's outbox DataSource. PTM DataSource: $ptmDataSource. Outbox DataSource: " + + "$outboxDataSource (resolved via okapi.datasource-qualifier=" + + "'${properties.datasourceQualifier ?: ""}'). Each scheduler tick would otherwise " + + "wrap a transaction on the wrong DataSource and FOR UPDATE SKIP LOCKED would collapse " + + "to JDBC auto-commit, allowing duplicate delivery. Fix: set okapi.transaction-manager-" + + "qualifier to point at the PTM that brackets the outbox DataSource, or define an explicit " + + "@Bean TransactionRunner.", + ) + } + return + } + val resourceFactoryDescription = describeUnextractable(ptm) + if (properties.datasourceQualifier != null) { + logger.warn( + "okapi.datasource-qualifier='{}' is set, but the resolved PlatformTransactionManager '{}' {} " + + "— okapi cannot verify it brackets the outbox DataSource. If the PTM is bound to a different " + + "DataSource, scheduler ticks will silently run in JDBC auto-commit mode and FOR UPDATE SKIP " + + "LOCKED will collapse, allowing duplicate delivery across processor instances. Set " + + "okapi.transaction-manager-qualifier to disambiguate, or define an explicit @Bean " + + "TransactionRunner.", + properties.datasourceQualifier, + ptm.javaClass.name, + resourceFactoryDescription, + ) + } else { + // No qualifier set → single-DS assumption. We cannot validate, but a future multi-DS + // migration that forgets to set the qualifier would silently break with no diagnostic. + // Emit an INFO breadcrumb so an operator debugging duplicate delivery has something + // to grep for. + logger.info( + "PlatformTransactionManager '{}' {} — okapi cannot verify it brackets the outbox " + + "DataSource. Assuming single-DataSource setup (okapi.datasource-qualifier is " + + "unset). If you have or add multiple DataSources, set okapi.transaction-manager-" + + "qualifier (and okapi.datasource-qualifier) explicitly to avoid silent " + + "PTM↔DataSource mismatch.", + ptm.javaClass.name, + resourceFactoryDescription, + ) + } + } + + // Iterative + visited-set: defends against self-referencing or cyclic DelegatingDataSource + // chains (Spring's `setTargetDataSource` is a public setter with no cycle check, so + // misconfiguration like `proxy.setTargetDataSource(proxy)` is legal API). A tailrec form + // would compile to an uninterruptible JVM goto loop and silently spin at startup. + internal fun unwrapDataSource(ds: DataSource): DataSource { + val seen = mutableSetOf() + var current: DataSource = ds + while (current is DelegatingDataSource) { + if (!seen.add(current)) return current + current = current.targetDataSource ?: return current + } + return current + } + + // JPA/Hibernate PTMs that expose a `public DataSource getDataSource()` — reflection by name + // avoids requiring spring-orm on the compile classpath (it's optional for JDBC-only consumers). + // Hibernate's TM moved package in Spring 6.2 — both names are listed so a single okapi build + // works against Spring Framework 6.1.x and 6.2+ without a version matrix. + // Hibernate's TM is named `org.springframework.orm.jpa.hibernate.HibernateTransactionManager` + // in Spring 6.2+ and `org.springframework.orm.hibernate5.HibernateTransactionManager` in + // Spring 6.1-. Both are listed so a single build works across versions. + // `internal` so reflection-resolution tests can verify the set isn't all stale. + internal val JPA_HIBERNATE_PTM_CLASSES = setOf( + "org.springframework.orm.jpa.JpaTransactionManager", + "org.springframework.orm.jpa.hibernate.HibernateTransactionManager", + "org.springframework.orm.hibernate5.HibernateTransactionManager", + ) + + internal fun extractDataSource(ptm: PlatformTransactionManager): DataSource? { + // Strategy 1: ResourceTransactionManager (DST + any RTM whose resourceFactory is a DataSource). + (ptm as? ResourceTransactionManager)?.resourceFactory?.let { rf -> + if (rf is DataSource) return rf + } + // Strategy 2/3: JPA/Hibernate public `getDataSource()`. Narrow catch on + // NoSuchMethodException only — all other exceptions intentionally propagate: + // LinkageError / NoClassDefFoundError → mixed-jar classpath bug, must surface (this + // is why we don't use `runCatching`, which would swallow them). + // InvocationTargetException → JpaTransactionManager constructed without an EMF; + // callable, but unusable for outbox bracketing — surface the original cause. + // IllegalAccessException / ClassCastException → incompatible Spring framework + // version where the contract has changed; not safe to silently fall back. + if (ptm.javaClass.name in JPA_HIBERNATE_PTM_CLASSES) { + return try { + ptm.javaClass.getMethod("getDataSource").invoke(ptm) as DataSource? + } catch (_: NoSuchMethodException) { + null + } + } + return null + } + + private fun describeUnextractable(ptm: PlatformTransactionManager): String { + if (ptm.javaClass.name in JPA_HIBERNATE_PTM_CLASSES) { + return "is ${ptm.javaClass.name} but its getDataSource() returned null — the " + + "EntityManagerFactory/SessionFactory was constructed without a JDBC DataSource " + + "(typical for pure-JTA / JNDI-only setups). okapi cannot verify the binding" + } + val rtmResourceFactory = (ptm as? ResourceTransactionManager)?.resourceFactory + return when { + ptm !is ResourceTransactionManager -> + "does not implement ResourceTransactionManager (no resource factory exposed; same shape as " + + "JtaTransactionManager or Exposed's SpringTransactionManager)" + rtmResourceFactory == null -> + "implements ResourceTransactionManager but its getResourceFactory() returned null" + else -> + "implements ResourceTransactionManager but its resourceFactory is of type " + + "'${rtmResourceFactory.javaClass.name}', not a JDBC DataSource" + } + } } } diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorScheduler.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorScheduler.kt index d4fc485..4a1c461 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorScheduler.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorScheduler.kt @@ -3,8 +3,8 @@ package com.softwaremill.okapi.springboot import com.softwaremill.okapi.core.OutboxProcessor import com.softwaremill.okapi.core.OutboxScheduler import com.softwaremill.okapi.core.OutboxSchedulerConfig +import com.softwaremill.okapi.core.TransactionRunner import org.springframework.context.SmartLifecycle -import org.springframework.transaction.support.TransactionTemplate /** * Spring lifecycle wrapper for [OutboxScheduler]. @@ -17,13 +17,13 @@ import org.springframework.transaction.support.TransactionTemplate */ class OutboxProcessorScheduler( outboxProcessor: OutboxProcessor, - transactionTemplate: TransactionTemplate?, + transactionRunner: TransactionRunner, config: OutboxSchedulerConfig = OutboxSchedulerConfig(), ) : SmartLifecycle { private val scheduler = OutboxScheduler( outboxProcessor = outboxProcessor, - transactionRunner = transactionTemplate?.let { SpringTransactionRunner(it) }, + transactionRunner = transactionRunner, config = config, ) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt index 1251ed3..2753837 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerScheduler.kt @@ -3,8 +3,8 @@ package com.softwaremill.okapi.springboot import com.softwaremill.okapi.core.OutboxPurger import com.softwaremill.okapi.core.OutboxPurgerConfig import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.core.TransactionRunner import org.springframework.context.SmartLifecycle -import org.springframework.transaction.support.TransactionTemplate import java.time.Clock /** @@ -15,14 +15,14 @@ import java.time.Clock */ class OutboxPurgerScheduler( outboxStore: OutboxStore, - transactionTemplate: TransactionTemplate? = null, + transactionRunner: TransactionRunner, config: OutboxPurgerConfig = OutboxPurgerConfig(), clock: Clock = Clock.systemUTC(), ) : SmartLifecycle { private val purger = OutboxPurger( outboxStore = outboxStore, - transactionRunner = transactionTemplate?.let { SpringTransactionRunner(it) }, + transactionRunner = transactionRunner, config = config, clock = clock, ) diff --git a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringTransactionRunner.kt b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringTransactionRunner.kt index 60ca8b1..3890bae 100644 --- a/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringTransactionRunner.kt +++ b/okapi-spring-boot/src/main/kotlin/com/softwaremill/okapi/springboot/SpringTransactionRunner.kt @@ -7,7 +7,7 @@ import org.springframework.transaction.support.TransactionTemplate * Spring implementation of [TransactionRunner] using [TransactionTemplate]. */ class SpringTransactionRunner( - private val transactionTemplate: TransactionTemplate, + val transactionTemplate: TransactionTemplate, ) : TransactionRunner { override fun runInTransaction(block: () -> T): T = transactionTemplate.execute { block() }!! } diff --git a/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json b/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json index 78305fe..214d96e 100644 --- a/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json +++ b/okapi-spring-boot/src/main/resources/META-INF/spring-configuration-metadata.json @@ -81,6 +81,12 @@ "defaultValue": null, "description": "Name of the DataSource bean to use for the outbox. When set, okapi uses this specific DataSource instead of the primary one. Required in multi-datasource setups where the outbox table is not in the primary database." }, + { + "name": "okapi.transaction-manager-qualifier", + "type": "java.lang.String", + "defaultValue": null, + "description": "Name of the PlatformTransactionManager bean that brackets the outbox DataSource. When not set, the unique (or @Primary) PTM is used. In multi-PTM setups always set this explicitly so okapi resolves the PTM bound to the outbox DataSource instead of the application's primary; silent PTM↔DataSource mismatch otherwise reduces FOR UPDATE SKIP LOCKED to JDBC auto-commit and permits duplicate delivery across processor instances." + }, { "name": "okapi.metrics.refresh-interval", "type": "java.time.Duration", diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/DataSourceQualifierAutoConfigurationTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/DataSourceQualifierAutoConfigurationTest.kt index 8bb5c15..c45ad39 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/DataSourceQualifierAutoConfigurationTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/DataSourceQualifierAutoConfigurationTest.kt @@ -5,6 +5,7 @@ import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.core.TransactionRunner import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.nulls.shouldNotBeNull import io.kotest.matchers.string.shouldContain @@ -23,6 +24,7 @@ class DataSourceQualifierAutoConfigurationTest : FunSpec({ .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java)) .withBean(OutboxStore::class.java, { stubStore() }) .withBean(MessageDeliverer::class.java, { stubDeliverer() }) + .withBean(TransactionRunner::class.java, { noOpTransactionRunner() }) test("no qualifier set, single datasource — uses that datasource") { val ds = SimpleDriverDataSource() @@ -89,6 +91,10 @@ class DataSourceQualifierAutoConfigurationTest : FunSpec({ } }) +private fun noOpTransactionRunner() = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = block() +} + private fun stubStore() = object : OutboxStore { override fun persist(entry: OutboxEntry) = entry override fun claimPending(limit: Int) = emptyList() diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/LiquibaseAutoConfigurationTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/LiquibaseAutoConfigurationTest.kt index f31ec54..0a91432 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/LiquibaseAutoConfigurationTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/LiquibaseAutoConfigurationTest.kt @@ -9,6 +9,7 @@ import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.core.TransactionRunner import io.kotest.assertions.throwables.shouldThrow import io.kotest.assertions.withClue import io.kotest.core.spec.style.FunSpec @@ -63,6 +64,7 @@ class LiquibaseAutoConfigurationTest : FunSpec({ .withBean(OutboxStore::class.java, { stubStore() }) .withBean(MessageDeliverer::class.java, { stubDeliverer() }) .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .withBean(TransactionRunner::class.java, { noOpTransactionRunner() }) .withOkapiLiquibaseDisabled() context("postgres liquibase") { @@ -183,6 +185,7 @@ class LiquibaseAutoConfigurationTest : FunSpec({ .withBean(OutboxStore::class.java, { stubStore() }) .withBean(MessageDeliverer::class.java, { stubDeliverer() }) .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .withBean(TransactionRunner::class.java, { noOpTransactionRunner() }) .withPropertyValues("okapi.liquibase.enabled=true") .run { ctx -> ctx.getBeansOfType(OkapiLiquibaseAutoConfiguration.LiquibaseDisabledNotice::class.java) @@ -214,6 +217,7 @@ class LiquibaseAutoConfigurationTest : FunSpec({ .withBean(OutboxStore::class.java, { stubStore() }) .withBean(MessageDeliverer::class.java, { stubDeliverer() }) .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .withBean(TransactionRunner::class.java, { noOpTransactionRunner() }) .withPropertyValues("okapi.liquibase.enabled=false") .run { ctx -> ctx.getBeansOfType(OkapiLiquibaseAutoConfiguration.LiquibaseDisabledNotice::class.java) @@ -413,6 +417,7 @@ class LiquibaseAutoConfigurationTest : FunSpec({ .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java, OkapiLiquibaseAutoConfiguration::class.java)) .withBean(MessageDeliverer::class.java, { stubDeliverer() }) .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .withBean(TransactionRunner::class.java, { noOpTransactionRunner() }) .withInitializer { ctx -> ctx.beanFactory.addBeanPostProcessor(SuppressSpringLiquibaseRun()) } @@ -466,6 +471,7 @@ class LiquibaseAutoConfigurationTest : FunSpec({ .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java, OkapiLiquibaseAutoConfiguration::class.java)) .withBean(MessageDeliverer::class.java, { stubDeliverer() }) .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .withBean(TransactionRunner::class.java, { noOpTransactionRunner() }) .withBean("okapiPostgresLiquibase", SpringLiquibase::class.java, { userBean }) .withInitializer { ctx -> ctx.beanFactory.addBeanPostProcessor(SuppressSpringLiquibaseRun()) @@ -509,6 +515,10 @@ private fun canLoadClass(fqcn: String, classLoader: ClassLoader): Boolean = try false } +private fun noOpTransactionRunner() = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = block() +} + private fun stubStore() = object : OutboxStore { override fun persist(entry: OutboxEntry) = entry override fun claimPending(limit: Int) = emptyList() diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfigurationTransactionRunnerTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfigurationTransactionRunnerTest.kt new file mode 100644 index 0000000..87c7035 --- /dev/null +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxAutoConfigurationTransactionRunnerTest.kt @@ -0,0 +1,625 @@ +package com.softwaremill.okapi.springboot + +import ch.qos.logback.classic.Level +import ch.qos.logback.classic.Logger +import ch.qos.logback.classic.spi.ILoggingEvent +import ch.qos.logback.core.read.ListAppender +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.MessageDeliverer +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.core.TransactionRunner +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.collections.shouldNotBeEmpty +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import io.kotest.matchers.string.shouldContain +import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.matchers.types.shouldBeSameInstanceAs +import org.h2.jdbcx.JdbcDataSource +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.NoSuchBeanDefinitionException +import org.springframework.beans.factory.support.BeanDefinitionBuilder +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Primary +import org.springframework.context.support.GenericApplicationContext +import org.springframework.jdbc.datasource.DataSourceTransactionManager +import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy +import org.springframework.jdbc.datasource.SimpleDriverDataSource +import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy +import org.springframework.transaction.PlatformTransactionManager +import org.springframework.transaction.TransactionDefinition +import org.springframework.transaction.support.AbstractPlatformTransactionManager +import org.springframework.transaction.support.DefaultTransactionStatus +import org.springframework.transaction.support.ResourceTransactionManager +import org.springframework.transaction.support.TransactionSynchronizationManager +import java.time.Instant +import javax.sql.DataSource +import kotlin.time.Duration.Companion.seconds + +class OutboxAutoConfigurationTransactionRunnerTest : FunSpec({ + + val baseRunner = ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java)) + .withBean(OutboxStore::class.java, { stubStore() }) + .withBean(MessageDeliverer::class.java, { stubDeliverer() }) + + test("derives SpringTransactionRunner from PlatformTransactionManager when present") { + val ds: DataSource = SimpleDriverDataSource() + baseRunner + .withBean(DataSource::class.java, { ds }) + .withBean(PlatformTransactionManager::class.java, { DataSourceTransactionManager(ds) }) + .run { ctx -> + ctx.getBean(TransactionRunner::class.java).shouldBeInstanceOf() + } + } + + test("auto-built TransactionTemplate is NOT read-only (defends against globally read-only TX defaults)") { + val ds: DataSource = h2DataSource() + baseRunner + .withBean(DataSource::class.java, { ds }) + .withBean(PlatformTransactionManager::class.java, { DataSourceTransactionManager(ds) }) + .run { ctx -> + val runner = ctx.getBean(TransactionRunner::class.java) + runner.runInTransaction { TransactionSynchronizationManager.isCurrentTransactionReadOnly() } shouldBe false + } + } + + test("okapi.transaction-manager-qualifier YAML key binds to OkapiProperties.transactionManagerQualifier (kebab → camelCase)") { + // Pins the property name contract — without this a future rename of the Kotlin field + // (e.g. transactionManagerQualifier → txManagerQualifier) silently breaks user + // configuration. Mirror of LiquibaseAutoConfigurationTest's "okapi.liquibase.* properties + // bind to nested config" test. + baseRunner + .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .withBean(TransactionRunner::class.java, { + object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = block() + } + }) + .withPropertyValues("okapi.transaction-manager-qualifier=outboxTm") + .run { ctx -> + ctx.getBean(OkapiProperties::class.java).transactionManagerQualifier shouldBe "outboxTm" + } + } + + test("blank okapi.transaction-manager-qualifier triggers startup failure with require() message") { + // Pins that init { require(isNotBlank()) } actually propagates through Spring's Binder. + // A future refactor of OkapiProperties that moves the require() outside init or onto a + // getter would silently let blank qualifiers through and cause a confusing + // NoSuchBeanDefinitionException at PTM lookup. Mirror of LiquibaseAutoConfigurationTest's + // "blank changelog-table property triggers startup failure". + baseRunner + .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .withBean(TransactionRunner::class.java, { + object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = block() + } + }) + .withPropertyValues("okapi.transaction-manager-qualifier= ") + .run { ctx -> + val rootCause = generateSequence(ctx.startupFailure) { it.cause }.last() + rootCause.message.shouldNotBeNull() + .shouldContain("okapi.transaction-manager-qualifier must not be blank") + } + } + + test("publish-only deployment: both schedulers disabled + no PTM + no user TransactionRunner — context starts cleanly") { + // Publish-only mode (e.g. message-producing service that delegates outbox processing to a separate + // worker). The TransactionRunner is only needed by scheduler/purger; with both disabled, no PTM + // should be required. Without this gate, okapi-spring-boot would prevent users from running + // publish-only with arbitrary PTMs (or none at all). + baseRunner + .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .withPropertyValues( + "okapi.processor.enabled=false", + "okapi.purger.enabled=false", + ) + .run { ctx -> + ctx.startupFailure.shouldBeNull() + ctx.containsBean("okapiTransactionRunner") shouldBe false + ctx.containsBean("outboxProcessorScheduler") shouldBe false + ctx.containsBean("outboxPurgerScheduler") shouldBe false + ctx.getBean(SpringOutboxPublisher::class.java).shouldNotBeNull() + } + } + + test("fails context refresh with actionable message when no PTM and no user TransactionRunner") { + baseRunner + .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .run { ctx -> + val failure = ctx.startupFailure + failure.shouldNotBeNull() + val rootCause = generateSequence(failure as Throwable?) { it.cause }.last() + rootCause.shouldBeInstanceOf() + rootCause.message.shouldNotBeNull().also { + it.shouldContain("okapi-spring-boot requires a TransactionRunner bean") + it.shouldContain("PlatformTransactionManager") + it.shouldContain("@Bean TransactionRunner") + } + } + } + + test("user-provided @Bean TransactionTemplate is honoured (autoconfig does not silently shadow it)") { + // If a user defines a custom TransactionTemplate (e.g. with non-default timeout / propagation / + // isolation), the autoconfig MUST use that exact template — not silently build its own fresh + // TransactionTemplate(ptm) from the PTM. A "silent shadow" regression would discard the user's + // intent for scheduler ticks while their @Transactional code paths use the configured template. + val ds: DataSource = SimpleDriverDataSource() + baseRunner + .withBean(DataSource::class.java, { ds }) + .withBean(PlatformTransactionManager::class.java, { DataSourceTransactionManager(ds) }) + .withUserConfiguration(CustomTransactionTemplateConfiguration::class.java) + .run { ctx -> + val runner = ctx.getBean(TransactionRunner::class.java).shouldBeInstanceOf() + runner.transactionTemplate.shouldBeSameInstanceAs(CustomTransactionTemplateConfiguration.TEMPLATE) + } + } + + test("user-provided TransactionRunner bean is honoured without PTM (@ConditionalOnMissingBean)") { + baseRunner + .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .withUserConfiguration(CustomRunnerConfiguration::class.java) + .run { ctx -> + ctx.getBean(TransactionRunner::class.java).shouldBeSameInstanceAs(CustomRunnerConfiguration.RUNNER) + } + } + + test("fails with distinct message when multiple PTMs are present and none is @Primary") { + val ds: DataSource = SimpleDriverDataSource() + baseRunner + .withBean(DataSource::class.java, { ds }) + .withUserConfiguration(TwoPtmsNoPrimaryUserConfig::class.java) + .run { ctx -> + val failure = ctx.startupFailure + failure.shouldNotBeNull() + val rootCause = generateSequence(failure as Throwable?) { it.cause }.last() + rootCause.shouldBeInstanceOf() + rootCause.message.shouldNotBeNull().also { + it.shouldContain("Multiple PlatformTransactionManager beans found") + it.shouldContain("@Primary") + it.shouldContain("okapi.transaction-manager-qualifier") + } + } + } + + test("uses @Primary PTM when multiple are present") { + val ds: DataSource = SimpleDriverDataSource() + baseRunner + .withBean(DataSource::class.java, { ds }) + .withUserConfiguration(PrimaryDstAndDummyPtmConfig::class.java) + .run { ctx -> + // PrimaryDstAndDummyPtmConfig.primaryTm is the @Primary DST → autoconfig should derive runner from it. + // If autoconfig (wrongly) picked the DummyTm, validation would not fire (DummyTm is not RTM) + // and the runner would still build — so we assert no startup failure as the strongest signal. + ctx.getBean(TransactionRunner::class.java).shouldBeInstanceOf() + } + } + + test("okapi.transaction-manager-qualifier picks the named PTM (overrides @Primary), proven via DS-match validation") { + // Setup designed so a buggy "ignore qualifier" implementation would FAIL: + // - appDs (primary), outboxDs (secondary) DataSources + // - appTm = DST(appDs), @Primary + // - outboxTm = DST(outboxDs) + // - okapi.datasource-qualifier=outboxDs (so resolveDataSource() == outboxDs) + // - okapi.transaction-manager-qualifier=outboxTm + // Correct: qualifier picks outboxTm → validation: outboxTm.dataSource == outboxDs → ok. + // Buggy (qualifier ignored): @Primary appTm picked → validation: appDs ≠ outboxDs → fail-fast. + val appDs: DataSource = SimpleDriverDataSource() + val outboxDs: DataSource = SimpleDriverDataSource() + ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java)) + .withBean(OutboxStore::class.java, { stubStore() }) + .withBean(MessageDeliverer::class.java, { stubDeliverer() }) + .withInitializer { context -> + val gac = context as GenericApplicationContext + gac.registerBeanDefinition( + "appDs", + BeanDefinitionBuilder.genericBeanDefinition(DataSource::class.java) { appDs }.beanDefinition + .apply { isPrimary = true }, + ) + gac.registerBeanDefinition( + "outboxDs", + BeanDefinitionBuilder.genericBeanDefinition(DataSource::class.java) { outboxDs }.beanDefinition, + ) + gac.registerBeanDefinition( + "appTm", + BeanDefinitionBuilder.genericBeanDefinition(PlatformTransactionManager::class.java) { + DataSourceTransactionManager(appDs) + }.beanDefinition.apply { isPrimary = true }, + ) + gac.registerBeanDefinition( + "outboxTm", + BeanDefinitionBuilder.genericBeanDefinition(PlatformTransactionManager::class.java) { + DataSourceTransactionManager(outboxDs) + }.beanDefinition, + ) + } + .withPropertyValues( + "okapi.datasource-qualifier=outboxDs", + "okapi.transaction-manager-qualifier=outboxTm", + ) + .run { ctx -> + ctx.startupFailure.shouldBeNull() + ctx.getBean(TransactionRunner::class.java).shouldBeInstanceOf() + } + } + + test("ResourceTransactionManager PTM with non-DataSource resourceFactory: WARN includes actual resourceFactory class name") { + // BUG C1 test (above, around line 254) covers the WARN-is-emitted contract for this case. + // This test pins the diagnostic CONTENT: the WARN must mention the actual resourceFactory + // class so a user with a custom RTM (or buggy subclass returning null) sees the real + // resource type, not just the static example list (JpaTransactionManager/Hibernate). Without + // this assertion a refactor that removed the dynamic class reference from the WARN would + // silently downgrade the diagnostic for non-JPA users. + val ds: DataSource = SimpleDriverDataSource() + val customRtm = JpaLikeRtmTransactionManager(resourceFactory = "myDistinctiveResourceFactory") + val targetLogger = LoggerFactory.getLogger(OutboxAutoConfiguration::class.java) as Logger + val appender = ListAppender().apply { start() } + targetLogger.addAppender(appender) + try { + baseRunner + .withBean("outboxDs", DataSource::class.java, { ds }) + .withBean(PlatformTransactionManager::class.java, { customRtm }) + .withPropertyValues("okapi.datasource-qualifier=outboxDs") + .run { ctx -> ctx.startupFailure.shouldBeNull() } + val warnText = appender.list.filter { it.level == Level.WARN }.joinToString("\n") { it.formattedMessage } + // resourceFactory is a String — WARN must mention its class so user sees what was actually + // exposed, not just the JpaTransactionManager/Hibernate examples. + warnText.shouldContain("java.lang.String") + } finally { + targetLogger.detachAppender(appender) + } + } + + test("non-ResourceTransactionManager PTM without okapi.datasource-qualifier (single-DS assumption): INFO breadcrumb emitted") { + // When validation cannot run AND no qualifier is set, the autoconfig assumes single-DS — but + // a future multi-DS migration that forgets to set the qualifier would silently break with no + // diagnostic. An INFO breadcrumb at startup gives the operator something to grep for when + // duplicate delivery starts to occur post-migration. + val ds: DataSource = SimpleDriverDataSource() + val targetLogger = LoggerFactory.getLogger(OutboxAutoConfiguration::class.java) as Logger + val appender = ListAppender().apply { start() } + targetLogger.addAppender(appender) + try { + baseRunner + .withBean(DataSource::class.java, { ds }) + .withBean(PlatformTransactionManager::class.java, { DummyTransactionManager() }) + // no okapi.datasource-qualifier + .run { ctx -> ctx.startupFailure.shouldBeNull() } + val infos = appender.list.filter { it.level == Level.INFO } + val combined = infos.joinToString("\n") { it.formattedMessage } + combined.shouldContain("does not implement ResourceTransactionManager") + combined.shouldContain("okapi.transaction-manager-qualifier") + } finally { + targetLogger.detachAppender(appender) + } + } + + test("non-ResourceTransactionManager PTM with okapi.datasource-qualifier set: context starts AND emits actionable WARN") { + // DummyTransactionManager extends AbstractPlatformTransactionManager but does NOT implement + // ResourceTransactionManager — same shape as Exposed's SpringTransactionManager. + // Validation cannot extract the PTM's DataSource, so it logs a WARN and allows the context to start. + // Captures the WARN content so deletion of the warn() body (or its message format) is also caught — + // not just deletion of the conditional that emits it. Without this assertion, the operator-facing + // breadcrumb could silently rot without breaking any other test. + val ds: DataSource = SimpleDriverDataSource() + val targetLogger = LoggerFactory.getLogger(OutboxAutoConfiguration::class.java) as Logger + val appender = ListAppender().apply { start() } + targetLogger.addAppender(appender) + try { + baseRunner + .withBean("outboxDs", DataSource::class.java, { ds }) + .withBean(PlatformTransactionManager::class.java, { DummyTransactionManager() }) + .withPropertyValues("okapi.datasource-qualifier=outboxDs") + .run { ctx -> + ctx.startupFailure.shouldBeNull() + ctx.getBean(TransactionRunner::class.java).shouldBeInstanceOf() + } + val warnings = appender.list.filter { it.level == Level.WARN } + warnings.shouldNotBeEmpty() + val combined = warnings.joinToString("\n") { it.formattedMessage } + combined.shouldContain("okapi.datasource-qualifier") + combined.shouldContain("does not implement ResourceTransactionManager") + combined.shouldContain("okapi.transaction-manager-qualifier") + } finally { + targetLogger.detachAppender(appender) + } + } + + test("okapi.transaction-manager-qualifier pointing to a nonexistent bean fails with actionable message") { + val ds: DataSource = SimpleDriverDataSource() + baseRunner + .withBean(DataSource::class.java, { ds }) + .withBean(PlatformTransactionManager::class.java, { DataSourceTransactionManager(ds) }) + .withPropertyValues("okapi.transaction-manager-qualifier=missingTm") + .run { ctx -> + val failure = ctx.startupFailure + failure.shouldNotBeNull() + val allMessages = generateSequence(failure as Throwable?) { it.cause }.mapNotNull { it.message }.toList() + allMessages.any { it.contains("okapi.transaction-manager-qualifier") } shouldBe true + allMessages.any { it.contains("missingTm") } shouldBe true + } + } + + test("okapi.transaction-manager-qualifier pointing to a bean of wrong type (e.g. DataSource) fails with actionable message") { + // Common typo: user means `okapi.transaction-manager-qualifier=outboxTm` but writes + // `=outboxDs` (the DataSource bean name). Spring throws `BeanNotOfRequiredTypeException`, + // NOT `NoSuchBeanDefinitionException`, so a naive `catch (NoSuchBeanDefinitionException)` + // never wraps the error and the user sees a cryptic message without okapi context. + val outboxDs: DataSource = SimpleDriverDataSource() + baseRunner + .withBean("outboxDs", DataSource::class.java, { outboxDs }) + .withBean("outboxTm", PlatformTransactionManager::class.java, { DataSourceTransactionManager(outboxDs) }) + // typo: pointing to the DataSource bean instead of the PTM bean + .withPropertyValues("okapi.transaction-manager-qualifier=outboxDs") + .run { ctx -> + val failure = ctx.startupFailure + failure.shouldNotBeNull() + val allMessages = generateSequence(failure as Throwable?) { it.cause }.mapNotNull { it.message }.toList() + allMessages.any { it.contains("okapi.transaction-manager-qualifier") } shouldBe true + allMessages.any { it.contains("outboxDs") } shouldBe true + allMessages.any { + it.contains("not a PlatformTransactionManager") || it.contains("PlatformTransactionManager") + } shouldBe true + } + } + + test("PTM bound to a different DataSource than outbox fails-fast with PTM↔DS mismatch message") { + val appDs: DataSource = SimpleDriverDataSource() + val outboxDs: DataSource = SimpleDriverDataSource() + ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java)) + .withBean(OutboxStore::class.java, { stubStore() }) + .withBean(MessageDeliverer::class.java, { stubDeliverer() }) + .withInitializer { context -> + val gac = context as GenericApplicationContext + gac.registerBeanDefinition( + "appDs", + BeanDefinitionBuilder.genericBeanDefinition(DataSource::class.java) { appDs }.beanDefinition + .apply { isPrimary = true }, + ) + gac.registerBeanDefinition( + "outboxDs", + BeanDefinitionBuilder.genericBeanDefinition(DataSource::class.java) { outboxDs }.beanDefinition, + ) + gac.registerBeanDefinition( + "appTm", + BeanDefinitionBuilder.genericBeanDefinition(PlatformTransactionManager::class.java) { + DataSourceTransactionManager(appDs) + }.beanDefinition, + ) + } + .withPropertyValues("okapi.datasource-qualifier=outboxDs") + .run { ctx -> + val failure = ctx.startupFailure + failure.shouldNotBeNull() + val rootCause = generateSequence(failure as Throwable?) { it.cause }.last() + rootCause.message.shouldNotBeNull().also { + it.shouldContain("bound to a different DataSource") + it.shouldContain("FOR UPDATE SKIP LOCKED") + it.shouldContain("okapi.transaction-manager-qualifier") + } + } + } + + // ----------------------------------------------------------------------------------------- + // C1 regression guard: ResourceTransactionManager with non-DataSource resourceFactory. + // JpaTransactionManager and HibernateTransactionManager both implement ResourceTransactionManager + // but their resourceFactory is EntityManagerFactory or SessionFactory respectively — NOT a + // DataSource. (JtaTransactionManager doesn't implement RTM at all and falls through the same + // WARN branch as Exposed's SpringTransactionManager.) Earlier shape of validatePtmDataSourceMatch + // did `as? DataSource ?: return`, silently early-returning and bypassing the WARN. This test + // pins that the cast failure now falls through to the WARN branch instead. + // ----------------------------------------------------------------------------------------- + test( + "BUG C1: RTM with non-DataSource resourceFactory + okapi.datasource-qualifier set: WARN should be logged but currently is silent", + ) { + val ds: DataSource = SimpleDriverDataSource() + val jpaLikeTm = JpaLikeRtmTransactionManager(resourceFactory = Any()) + + val targetLogger = LoggerFactory.getLogger(OutboxAutoConfiguration::class.java) as Logger + val appender = ListAppender().apply { start() } + targetLogger.addAppender(appender) + try { + baseRunner + .withBean("outboxDs", DataSource::class.java, { ds }) + .withBean(PlatformTransactionManager::class.java, { jpaLikeTm }) + .withPropertyValues("okapi.datasource-qualifier=outboxDs") + .run { ctx -> + ctx.startupFailure.shouldBeNull() + ctx.getBean(TransactionRunner::class.java).shouldBeInstanceOf() + } + val warningsAboutValidation = appender.list.filter { + it.level == Level.WARN && it.formattedMessage.contains("okapi.datasource-qualifier") + } + warningsAboutValidation.shouldNotBeEmpty() + } finally { + targetLogger.detachAppender(appender) + } + } + + // ----------------------------------------------------------------------------------------- + // C2 demonstration: TransactionAwareDataSourceProxy false-positive. + // Spring's recommended pattern for outbox-style scenarios is to register + // TransactionAwareDataSourceProxy(rawDs) as the application-facing bean for query helpers, while + // the PlatformTransactionManager is wired with the RAW DataSource (Spring docs explicitly say: + // "TransactionAwareDataSourceProxy should NOT be passed to a PTM"). With this correct pattern: + // - ptm.resourceFactory === rawDs + // - resolveDataSource() === proxyDs (the bean qualified by okapi.datasource-qualifier) + // Our validation uses reference equality (`!==`) → fail-fast on a *correctly* configured app. + // ----------------------------------------------------------------------------------------- + test("BUG C2: TransactionAwareDataSourceProxy(rawDs) on outbox bean + PTM(rawDs) is Spring's recommended pattern, must not fail") { + val rawDs: DataSource = SimpleDriverDataSource() + val proxyDs: DataSource = TransactionAwareDataSourceProxy(rawDs) + ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java)) + .withBean(OutboxStore::class.java, { stubStore() }) + .withBean(MessageDeliverer::class.java, { stubDeliverer() }) + .withInitializer { context -> + val gac = context as GenericApplicationContext + gac.registerBeanDefinition( + "outboxDs", + BeanDefinitionBuilder.genericBeanDefinition(DataSource::class.java) { proxyDs }.beanDefinition + .apply { isPrimary = true }, + ) + gac.registerBeanDefinition( + "outboxTm", + BeanDefinitionBuilder.genericBeanDefinition(PlatformTransactionManager::class.java) { + DataSourceTransactionManager(rawDs) + }.beanDefinition, + ) + } + .withPropertyValues("okapi.datasource-qualifier=outboxDs") + .run { ctx -> + ctx.startupFailure.shouldBeNull() + ctx.getBean(TransactionRunner::class.java).shouldBeInstanceOf() + } + } + + // Direct unit tests for the unwrapDataSource helper. The integration-level BUG C2 test above + // covers the single-level TransactionAwareDataSourceProxy case via real autoconfig wiring. + // These tests pin the helper's contract for nested chains, null targets, and cycles. + context("unwrapDataSource") { + test("returns the input unchanged when not a DelegatingDataSource") { + val raw: DataSource = SimpleDriverDataSource() + OutboxAutoConfiguration.unwrapDataSource(raw) shouldBeSameInstanceAs raw + } + + test("unwraps a single-level TransactionAwareDataSourceProxy to its target") { + val raw: DataSource = SimpleDriverDataSource() + val proxy: DataSource = TransactionAwareDataSourceProxy(raw) + OutboxAutoConfiguration.unwrapDataSource(proxy) shouldBeSameInstanceAs raw + } + + test("unwraps a nested chain TADP -> LCDP -> raw down to the raw DataSource") { + val raw: DataSource = SimpleDriverDataSource() + val nested: DataSource = TransactionAwareDataSourceProxy(LazyConnectionDataSourceProxy(raw)) + OutboxAutoConfiguration.unwrapDataSource(nested) shouldBeSameInstanceAs raw + } + + test("returns the proxy itself when DelegatingDataSource has null targetDataSource (graceful, no NPE)") { + // LazyConnectionDataSourceProxy ships with a no-arg constructor that leaves targetDataSource null + // until setTargetDataSource is called. At validation time some users may have such partially- + // initialised proxies. We must not NPE; we return the proxy itself so the caller's reference + // comparison can proceed (and produce a clear "DataSource mismatch" error if applicable). + val proxy: DataSource = LazyConnectionDataSourceProxy() + OutboxAutoConfiguration.unwrapDataSource(proxy) shouldBeSameInstanceAs proxy + } + + test("does NOT hang on a self-referencing DelegatingDataSource (cycle detection)").config(timeout = 2.seconds) { + val cyclic = LazyConnectionDataSourceProxy() + cyclic.setTargetDataSource(cyclic) + // After A3 fix: cycle detected, returns the cyclic proxy itself. + // Without A3 fix: tailrec compiles to goto → infinite CPU spin → test times out and fails. + OutboxAutoConfiguration.unwrapDataSource(cyclic) shouldBeSameInstanceAs cyclic + } + + test("does NOT hang on a longer cycle (A -> B -> A)").config(timeout = 2.seconds) { + val a = LazyConnectionDataSourceProxy() + val b = LazyConnectionDataSourceProxy() + a.setTargetDataSource(b) + b.setTargetDataSource(a) + // Walk: a -> b -> a; on the second visit to a the set already contains it, so the + // helper returns the current node (a). Any deterministic non-hanging answer is + // acceptable — the contract is "don't loop forever", and the caller will fall back + // to the WARN-or-error path because the unwrapped value isn't a real backing DataSource. + OutboxAutoConfiguration.unwrapDataSource(a) shouldBeSameInstanceAs a + } + } +}) + +// Mimics JpaTransactionManager / HibernateTransactionManager: implements ResourceTransactionManager +// but exposes a non-DataSource (EntityManagerFactory / SessionFactory) as the resource factory. +// JtaTransactionManager does NOT implement ResourceTransactionManager — it falls through the +// non-RTM branch alongside Exposed's SpringTransactionManager. +private class JpaLikeRtmTransactionManager(private val resourceFactory: Any) : + AbstractPlatformTransactionManager(), + ResourceTransactionManager { + override fun getResourceFactory(): Any = resourceFactory + override fun doGetTransaction(): Any = Any() + override fun doBegin(transaction: Any, definition: TransactionDefinition) {} + override fun doCommit(status: DefaultTransactionStatus) {} + override fun doRollback(status: DefaultTransactionStatus) {} +} + +@Configuration(proxyBeanMethods = false) +private class CustomRunnerConfiguration { + @Bean + fun customRunner(): TransactionRunner = RUNNER + + companion object { + val RUNNER: TransactionRunner = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = block() + } + } +} + +@Configuration(proxyBeanMethods = false) +private class CustomTransactionTemplateConfiguration { + @Bean + fun customTemplate(ptm: PlatformTransactionManager): org.springframework.transaction.support.TransactionTemplate { + TEMPLATE.transactionManager = ptm + return TEMPLATE + } + + companion object { + // Distinctive timeout makes the template easily identifiable; the autoconfig must not silently + // build its own TransactionTemplate(ptm) bypassing this one. + val TEMPLATE: org.springframework.transaction.support.TransactionTemplate = + org.springframework.transaction.support.TransactionTemplate().apply { + timeout = 42 + } + } +} + +@Configuration(proxyBeanMethods = false) +private class TwoPtmsNoPrimaryUserConfig { + @Bean + fun firstTm(): PlatformTransactionManager = DummyTransactionManager() + + @Bean + fun secondTm(): PlatformTransactionManager = DummyTransactionManager() +} + +@Configuration(proxyBeanMethods = false) +private class PrimaryDstAndDummyPtmConfig { + @Bean + @Primary + fun primaryTm(ds: DataSource): PlatformTransactionManager = DataSourceTransactionManager(ds) + + @Bean + fun secondaryDummyTm(): PlatformTransactionManager = DummyTransactionManager() +} + +private class DummyTransactionManager : AbstractPlatformTransactionManager() { + override fun doGetTransaction(): Any = Any() + override fun doBegin(transaction: Any, definition: org.springframework.transaction.TransactionDefinition) {} + override fun doCommit(status: DefaultTransactionStatus) {} + override fun doRollback(status: DefaultTransactionStatus) {} +} + +private fun h2DataSource(): DataSource = JdbcDataSource().apply { + setURL("jdbc:h2:mem:probe-tx-runner-${System.nanoTime()};DB_CLOSE_DELAY=-1") + user = "sa" + password = "" +} + +private fun stubStore() = object : OutboxStore { + override fun persist(entry: OutboxEntry) = entry + override fun claimPending(limit: Int) = emptyList() + override fun updateAfterProcessing(entry: OutboxEntry) = entry + override fun removeDeliveredBefore(time: Instant, limit: Int) = 0 + override fun findOldestCreatedAt(statuses: Set) = emptyMap() + override fun countByStatuses() = emptyMap() +} + +private fun stubDeliverer() = object : MessageDeliverer { + override val type = "stub" + override fun deliver(entry: OutboxEntry) = DeliveryResult.Success +} diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt index 38a60ac..15f0b5c 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxProcessorAutoConfigurationTest.kt @@ -3,8 +3,10 @@ package com.softwaremill.okapi.springboot import com.softwaremill.okapi.core.DeliveryResult import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxEntryProcessor import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.core.TransactionRunner import com.softwaremill.okapi.micrometer.MicrometerOutboxListener import com.softwaremill.okapi.micrometer.MicrometerOutboxMetrics import com.softwaremill.okapi.micrometer.OutboxMetricsRefresher @@ -30,6 +32,7 @@ class OutboxProcessorAutoConfigurationTest : FunSpec({ .withBean(OutboxStore::class.java, { stubStore() }) .withBean(MessageDeliverer::class.java, { stubDeliverer() }) .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .withBean(TransactionRunner::class.java, { noOpTransactionRunner() }) test("processor bean is created by default") { contextRunner.run { ctx -> @@ -69,10 +72,19 @@ class OutboxProcessorAutoConfigurationTest : FunSpec({ } } - test("SmartLifecycle is running after context start") { + test("SmartLifecycle is running after context start, and stop() actually halts it") { contextRunner.run { ctx -> val scheduler = ctx.getBean(OutboxProcessorScheduler::class.java) scheduler.isRunning shouldBe true + scheduler.stop() + scheduler.isRunning shouldBe false + } + } + + test("getPhase returns PROCESSOR_PHASE constant (orders before purger)") { + contextRunner.run { ctx -> + val scheduler = ctx.getBean(OutboxProcessorScheduler::class.java) + scheduler.phase shouldBe OutboxProcessorScheduler.PROCESSOR_PHASE } } @@ -84,15 +96,26 @@ class OutboxProcessorAutoConfigurationTest : FunSpec({ } } - test("stop callback is always invoked") { + test("stop(callback) invokes callback AND actually halts the scheduler") { contextRunner.run { ctx -> val scheduler = ctx.getBean(OutboxProcessorScheduler::class.java) var callbackInvoked = false scheduler.stop { callbackInvoked = true } callbackInvoked shouldBe true + scheduler.isRunning shouldBe false } } + test("multiple MessageDeliverer beans are wrapped in CompositeMessageDeliverer (routed by deliveryType)") { + contextRunner + .withBean("secondDeliverer", MessageDeliverer::class.java, { stubDelivererWithType("second") }) + .run { ctx -> + val processor = ctx.getBean(OutboxEntryProcessor::class.java) + processor.shouldNotBeNull() + ctx.getBeansOfType(MessageDeliverer::class.java).size shouldBe 2 + } + } + test("listener, metrics and refresher are wired when a MeterRegistry bean is provided directly") { contextRunner .withBean(io.micrometer.core.instrument.MeterRegistry::class.java, { @@ -159,6 +182,7 @@ class OutboxProcessorAutoConfigurationTest : FunSpec({ .withBean(OutboxStore::class.java, { stubStore() }) .withBean(MessageDeliverer::class.java, { stubDeliverer() }) .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .withBean(TransactionRunner::class.java, { noOpTransactionRunner() }) .run { ctx -> ctx.getBean(io.micrometer.core.instrument.MeterRegistry::class.java).shouldNotBeNull() ctx.getBean(MicrometerOutboxListener::class.java).shouldNotBeNull() @@ -208,6 +232,10 @@ private fun resolveSpringBootClass(vararg candidateFqcns: String): Class<*> { } ?: error("None of $candidateFqcns resolves on this Spring Boot runtime; check spring-boot-starter-actuator on the test classpath.") } +private fun noOpTransactionRunner() = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = block() +} + private fun stubStore() = object : OutboxStore { override fun persist(entry: OutboxEntry) = entry override fun claimPending(limit: Int) = emptyList() @@ -217,7 +245,9 @@ private fun stubStore() = object : OutboxStore { override fun countByStatuses() = emptyMap() } -private fun stubDeliverer() = object : MessageDeliverer { - override val type = "stub" +private fun stubDeliverer() = stubDelivererWithType("stub") + +private fun stubDelivererWithType(t: String) = object : MessageDeliverer { + override val type = t override fun deliver(entry: OutboxEntry) = DeliveryResult.Success } diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt index 9c89735..ba7c6ca 100644 --- a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/OutboxPurgerAutoConfigurationTest.kt @@ -6,6 +6,7 @@ import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxPurgerConfig import com.softwaremill.okapi.core.OutboxStatus import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.core.TransactionRunner import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.nulls.shouldNotBeNull import io.kotest.matchers.shouldBe @@ -25,6 +26,7 @@ class OutboxPurgerAutoConfigurationTest : FunSpec({ .withBean(OutboxStore::class.java, { stubStore() }) .withBean(MessageDeliverer::class.java, { stubDeliverer() }) .withBean(DataSource::class.java, { SimpleDriverDataSource() }) + .withBean(TransactionRunner::class.java, { noOpTransactionRunner() }) test("purger bean is created by default") { contextRunner.run { ctx -> @@ -64,10 +66,19 @@ class OutboxPurgerAutoConfigurationTest : FunSpec({ } } - test("SmartLifecycle is running after context start") { + test("SmartLifecycle is running after context start, and stop() actually halts it") { contextRunner.run { ctx -> val scheduler = ctx.getBean(OutboxPurgerScheduler::class.java) scheduler.isRunning shouldBe true + scheduler.stop() + scheduler.isRunning shouldBe false + } + } + + test("getPhase returns PURGER_PHASE constant (orders after processor)") { + contextRunner.run { ctx -> + val scheduler = ctx.getBean(OutboxPurgerScheduler::class.java) + scheduler.phase shouldBe OutboxPurgerScheduler.PURGER_PHASE } } @@ -79,9 +90,10 @@ class OutboxPurgerAutoConfigurationTest : FunSpec({ } } - test("stop callback is always invoked") { + test("stop(callback) invokes callback AND actually halts the purger") { val scheduler = OutboxPurgerScheduler( outboxStore = stubStore(), + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMinutes(60)), ) scheduler.start() @@ -90,9 +102,14 @@ class OutboxPurgerAutoConfigurationTest : FunSpec({ scheduler.stop { callbackInvoked = true } callbackInvoked shouldBe true + scheduler.isRunning shouldBe false } }) +private fun noOpTransactionRunner() = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = block() +} + private fun stubStore() = object : OutboxStore { override fun persist(entry: OutboxEntry) = entry override fun claimPending(limit: Int) = emptyList() diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringObjectProviderSemanticsAssumptionsTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringObjectProviderSemanticsAssumptionsTest.kt new file mode 100644 index 0000000..b36f7ff --- /dev/null +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/SpringObjectProviderSemanticsAssumptionsTest.kt @@ -0,0 +1,107 @@ +package com.softwaremill.okapi.springboot + +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.shouldBe +import io.kotest.matchers.types.shouldBeSameInstanceAs +import org.springframework.beans.factory.NoUniqueBeanDefinitionException +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Primary +import org.springframework.jdbc.datasource.DataSourceTransactionManager +import org.springframework.jdbc.datasource.SimpleDriverDataSource +import org.springframework.transaction.PlatformTransactionManager +import org.springframework.transaction.support.AbstractPlatformTransactionManager +import org.springframework.transaction.support.DefaultTransactionStatus +import org.springframework.transaction.support.ResourceTransactionManager + +/** + * Pins the Spring 7.0.7 `ObjectProvider` and `PlatformTransactionManager` semantics that + * `OutboxAutoConfiguration.okapiTransactionRunner` relies on. If a future Spring upgrade + * changes any of these behaviors, these tests fail loudly so the autoconfig logic gets + * audited rather than silently breaking. + * + * Specifically the autoconfig assumes: + * 1. `getIfUnique()` returns null (NOT throws) when 2+ PTM beans exist without `@Primary`. + * Used to distinguish "no PTM" from "multiple PTMs" via stream count. + * 2. `getIfUnique()` returns the `@Primary` bean when present. + * 3. `getIfAvailable()` THROWS `NoUniqueBeanDefinitionException` for 2+ non-primary + * candidates — the previous fix used this and surfaced a misleading error; switching to + * `getIfUnique()` was a deliberate semantic change. + * 4. `DataSourceTransactionManager` IS-A `ResourceTransactionManager`, with + * `resourceFactory == DataSource`. Our PTM↔DS validation depends on this cast. + * 5. PTMs that extend `AbstractPlatformTransactionManager` directly (e.g. Exposed + * `SpringTransactionManager`, `JpaTransactionManager`) do NOT implement + * `ResourceTransactionManager` — meaning we cannot extract their DataSource for validation + * and must fall back to a WARN log + require explicit `okapi.transaction-manager-qualifier`. + */ +class SpringObjectProviderSemanticsAssumptionsTest : FunSpec({ + + test("ObjectProvider.getIfUnique() returns the @Primary PTM when multiple PTMs exist") { + ApplicationContextRunner() + .withUserConfiguration(TwoPtmsWithPrimaryConfig::class.java) + .run { ctx -> + ctx.getBeanProvider(PlatformTransactionManager::class.java).getIfUnique() + .shouldBeSameInstanceAs(ctx.getBean("primaryTm", PlatformTransactionManager::class.java)) + } + } + + test("ObjectProvider.getIfUnique() returns null for 2+ non-primary PTMs (does NOT throw)") { + ApplicationContextRunner() + .withUserConfiguration(TwoPtmsNoPrimaryConfig::class.java) + .run { ctx -> + ctx.getBeanProvider(PlatformTransactionManager::class.java).getIfUnique().shouldBeNull() + } + } + + test( + "ObjectProvider.getIfAvailable() THROWS NoUniqueBeanDefinitionException for 2+ non-primary PTMs (this is why we use getIfUnique)", + ) { + ApplicationContextRunner() + .withUserConfiguration(TwoPtmsNoPrimaryConfig::class.java) + .run { ctx -> + val provider = ctx.getBeanProvider(PlatformTransactionManager::class.java) + val thrown = runCatching { provider.getIfAvailable() }.exceptionOrNull() + (thrown is NoUniqueBeanDefinitionException) shouldBe true + } + } + + test("DataSourceTransactionManager implements ResourceTransactionManager and exposes its DataSource via resourceFactory") { + val ds = SimpleDriverDataSource() + val dst: PlatformTransactionManager = DataSourceTransactionManager(ds) + (dst is ResourceTransactionManager) shouldBe true + (dst as ResourceTransactionManager).resourceFactory.shouldBeSameInstanceAs(ds) + } + + test("AbstractPlatformTransactionManager subclasses (e.g. Exposed bridge / JPA-style) do NOT implement ResourceTransactionManager") { + val tm: PlatformTransactionManager = DummyAbstractPtm() + (tm is ResourceTransactionManager) shouldBe false + } +}) + +@Configuration(proxyBeanMethods = false) +private class TwoPtmsNoPrimaryConfig { + @Bean + fun firstTm(): PlatformTransactionManager = DummyAbstractPtm() + + @Bean + fun secondTm(): PlatformTransactionManager = DummyAbstractPtm() +} + +@Configuration(proxyBeanMethods = false) +private class TwoPtmsWithPrimaryConfig { + @Bean + @Primary + fun primaryTm(): PlatformTransactionManager = DummyAbstractPtm() + + @Bean + fun secondaryTm(): PlatformTransactionManager = DummyAbstractPtm() +} + +private class DummyAbstractPtm : AbstractPlatformTransactionManager() { + override fun doGetTransaction(): Any = Any() + override fun doBegin(transaction: Any, definition: org.springframework.transaction.TransactionDefinition) {} + override fun doCommit(status: DefaultTransactionStatus) {} + override fun doRollback(status: DefaultTransactionStatus) {} +} diff --git a/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/TransactionTemplateHijackProofTest.kt b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/TransactionTemplateHijackProofTest.kt new file mode 100644 index 0000000..5302adb --- /dev/null +++ b/okapi-spring-boot/src/test/kotlin/com/softwaremill/okapi/springboot/TransactionTemplateHijackProofTest.kt @@ -0,0 +1,143 @@ +package com.softwaremill.okapi.springboot + +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.MessageDeliverer +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxStatus +import com.softwaremill.okapi.core.OutboxStore +import com.softwaremill.okapi.core.TransactionRunner +import io.kotest.assertions.withClue +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import io.kotest.matchers.shouldNotBe +import io.kotest.matchers.string.shouldContain +import io.kotest.matchers.types.shouldBeInstanceOf +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import org.springframework.jdbc.datasource.DataSourceTransactionManager +import org.springframework.jdbc.datasource.SimpleDriverDataSource +import org.springframework.transaction.PlatformTransactionManager +import org.springframework.transaction.support.TransactionTemplate +import java.time.Instant +import javax.sql.DataSource + +/** + * Reliable proof tests against the ultrareview claim: + * "Spring Boot's auto-configured TransactionTemplate hijacks okapiTransactionRunner — the factory + * short-circuits to the Boot-supplied template and skips PTM↔DataSource validation entirely." + * + * The previous single-test version asserted only that startup failed in the mismatch scenario, then + * inferred "validation ran". That's brittle — context could fail for unrelated reasons, or autoconfig + * ordering in slice tests could differ from production. These three tests pin down three independent + * invariants so the conclusion is empirically forced: + * + * 1. PRECONDITION: Spring Boot's TransactionAutoConfiguration actually creates a `TransactionTemplate` + * bean in slice tests. If false, the hijack scenario cannot occur in this test harness and the + * other two tests prove nothing — the suite fails loudly instead of silently passing. + * + * 2. INTROSPECTION: in a single-DS happy path, `okapiTransactionRunner` produces a + * `SpringTransactionRunner` whose `TransactionTemplate.transactionManager` is the user's PTM. + * Combined with #1 this proves: even when Boot's TT bean exists, the factory does NOT pick it. + * (If the factory short-circuited via Boot's TT, the embedded PTM identity would not match.) + * + * 3. MISMATCH FAIL-FAST: in a 2-DS scenario with PTM bound to the wrong DS, startup fails with + * a literal substring from `validatePtmDataSourceMatch`'s `error(...)` message that no other + * Spring component emits — passing this assertion proves our validation code path was reached, + * not just that something failed. + */ +class TransactionTemplateHijackProofTest : FunSpec({ + + // Spring Boot 3.x: org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration + // Spring Boot 4.x: org.springframework.boot.transaction.autoconfigure.TransactionAutoConfiguration + val txAutoConfigClass: Class<*> = listOf( + "org.springframework.boot.transaction.autoconfigure.TransactionAutoConfiguration", + "org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration", + ).firstNotNullOfOrNull { fqcn -> + try { + Class.forName(fqcn) + } catch (_: ClassNotFoundException) { + null + } + } ?: error( + "TransactionAutoConfiguration not on test classpath. Without it the entire hijack scenario " + + "cannot be reproduced — failing the suite loudly rather than silently passing. Check that " + + "spring-boot-transaction (4.x) or spring-boot-autoconfigure (3.x) is on testRuntimeClasspath.", + ) + + test("PRECONDITION: Spring Boot's TransactionAutoConfiguration registers a TransactionTemplate bean") { + val ds: DataSource = SimpleDriverDataSource() + ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(txAutoConfigClass)) + .withBean(DataSource::class.java, { ds }) + .withBean(PlatformTransactionManager::class.java, { DataSourceTransactionManager(ds) }) + .run { ctx -> + withClue( + "If this fails, Spring Boot's TransactionTemplateConfiguration was not triggered in slice " + + "tests — the hijack tests below would silently pass without testing anything. Investigate " + + "before trusting test #2 / #3 results.", + ) { + ctx.getBean(TransactionTemplate::class.java).shouldNotBeNull() + } + } + } + + test("INTROSPECTION: with Boot's TT in context, okapiTransactionRunner builds a TT around OUR PTM") { + val ds: DataSource = SimpleDriverDataSource() + val ourPtm: PlatformTransactionManager = DataSourceTransactionManager(ds) + ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java, txAutoConfigClass)) + .withBean(DataSource::class.java, { ds }) + .withBean("ourPtm", PlatformTransactionManager::class.java, { ourPtm }) + .withBean(OutboxStore::class.java, { stubStore() }) + .withBean(MessageDeliverer::class.java, { stubDeliverer() }) + .run { ctx -> + ctx.startupFailure shouldBe null + val runner = ctx.getBean(TransactionRunner::class.java) + runner.shouldBeInstanceOf() + // The TT that okapiTransactionRunner wraps must be bound to OUR PTM — not Boot's + // auto-configured TT (which is the hijack failure mode). Reference identity, not + // equality, since each PTM is a distinct object. + withClue("okapiTransactionRunner is wrapping a TT whose internal PTM is NOT our ourPtm — hijack confirmed") { + runner.transactionTemplate.transactionManager shouldBe ourPtm + } + } + } + + test("MISMATCH FAIL-FAST: PTM bound to wrong DataSource triggers validatePtmDataSourceMatch error") { + val dsA: DataSource = SimpleDriverDataSource() + val dsB: DataSource = SimpleDriverDataSource() + ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(OutboxAutoConfiguration::class.java, txAutoConfigClass)) + .withBean( + "dsB", + DataSource::class.java, + { dsB }, + org.springframework.beans.factory.config.BeanDefinitionCustomizer { it.isPrimary = true }, + ) + .withBean("dsA", DataSource::class.java, { dsA }) + .withBean("dstA", PlatformTransactionManager::class.java, { DataSourceTransactionManager(dsA) }) + .withBean(OutboxStore::class.java, { stubStore() }) + .withBean(MessageDeliverer::class.java, { stubDeliverer() }) + .run { ctx -> + val failure = ctx.startupFailure + failure shouldNotBe null + failure!!.stackTraceToString() shouldContain + "is bound to a different DataSource than okapi's outbox DataSource" + } + } +}) + +private fun stubStore() = object : OutboxStore { + override fun persist(entry: OutboxEntry) = entry + override fun claimPending(limit: Int) = emptyList() + override fun updateAfterProcessing(entry: OutboxEntry) = entry + override fun removeDeliveredBefore(time: Instant, limit: Int) = 0 + override fun findOldestCreatedAt(statuses: Set) = emptyMap() + override fun countByStatuses() = emptyMap() +} + +private fun stubDeliverer() = object : MessageDeliverer { + override val type = "stub" + override fun deliver(entry: OutboxEntry) = DeliveryResult.Success +}