Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Features

- Add `sentry-kafka` module for Kafka queue instrumentation without Spring ([#5288](https://github.com/getsentry/sentry-java/pull/5288))
- Add Kafka queue tracing for Spring Boot 3 ([#5254](https://github.com/getsentry/sentry-java/pull/5254)), ([#5255](https://github.com/getsentry/sentry-java/pull/5255)), ([#5256](https://github.com/getsentry/sentry-java/pull/5256))
- Add `enableQueueTracing` option and messaging span data conventions ([#5250](https://github.com/getsentry/sentry-java/pull/5250))
- Prevent cross-organization trace continuation ([#5136](https://github.com/getsentry/sentry-java/pull/5136))
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Sentry SDK for Java and Android
| sentry | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry?style=for-the-badge&logo=sentry&color=green) | 21 |
| sentry-jul | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-jul?style=for-the-badge&logo=sentry&color=green) |
| sentry-jdbc | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-jdbc?style=for-the-badge&logo=sentry&color=green) |
| sentry-kafka | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-kafka?style=for-the-badge&logo=sentry&color=green) |
| sentry-apollo | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo?style=for-the-badge&logo=sentry&color=green) | 21 |
| sentry-apollo-3 | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo-3?style=for-the-badge&logo=sentry&color=green) | 21 |
| sentry-apollo-4 | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo-4?style=for-the-badge&logo=sentry&color=green) | 21 |
Expand Down
1 change: 1 addition & 0 deletions buildSrc/src/main/java/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ object Config {
val SENTRY_JCACHE_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.jcache"
val SENTRY_QUARTZ_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.quartz"
val SENTRY_JDBC_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.jdbc"
val SENTRY_KAFKA_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.kafka"
val SENTRY_OPENFEATURE_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.openfeature"
val SENTRY_LAUNCHDARKLY_SERVER_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.launchdarkly-server"
val SENTRY_LAUNCHDARKLY_ANDROID_SDK_NAME = "$SENTRY_ANDROID_SDK_NAME.launchdarkly"
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ springboot3-starter-jdbc = { module = "org.springframework.boot:spring-boot-star
springboot3-starter-actuator = { module = "org.springframework.boot:spring-boot-starter-actuator", version.ref = "springboot3" }
springboot3-starter-cache = { module = "org.springframework.boot:spring-boot-starter-cache", version.ref = "springboot3" }
spring-kafka3 = { module = "org.springframework.kafka:spring-kafka", version = "3.3.5" }
kafka-clients = { module = "org.apache.kafka:kafka-clients", version = "3.8.1" }
springboot4-otel = { module = "io.opentelemetry.instrumentation:opentelemetry-spring-boot-starter", version.ref = "otelInstrumentation" }
springboot4-resttestclient = { module = "org.springframework.boot:spring-boot-resttestclient", version.ref = "springboot4" }
springboot4-starter = { module = "org.springframework.boot:spring-boot-starter", version.ref = "springboot4" }
Expand Down
5 changes: 5 additions & 0 deletions sentry-kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# sentry-kafka

This module provides Kafka-native queue instrumentation for applications using `kafka-clients` directly.

Spring users should use `sentry-spring-boot-jakarta` / `sentry-spring-jakarta`, which provide higher-fidelity consumer instrumentation via Spring Kafka hooks.
25 changes: 25 additions & 0 deletions sentry-kafka/api/sentry-kafka.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
public final class io/sentry/kafka/BuildConfig {
public static final field SENTRY_KAFKA_SDK_NAME Ljava/lang/String;
public static final field VERSION_NAME Ljava/lang/String;
}

public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/kafka/clients/consumer/ConsumerInterceptor {
public static final field TRACE_ORIGIN Ljava/lang/String;
public fun <init> (Lio/sentry/IScopes;)V
public fun close ()V
public fun configure (Ljava/util/Map;)V
public fun onCommit (Ljava/util/Map;)V
public fun onConsume (Lorg/apache/kafka/clients/consumer/ConsumerRecords;)Lorg/apache/kafka/clients/consumer/ConsumerRecords;
}

public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String;
public static final field TRACE_ORIGIN Ljava/lang/String;
public fun <init> (Lio/sentry/IScopes;)V
public fun <init> (Lio/sentry/IScopes;Ljava/lang/String;)V
public fun close ()V
public fun configure (Ljava/util/Map;)V
public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V
public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord;
}

83 changes: 83 additions & 0 deletions sentry-kafka/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import net.ltgt.gradle.errorprone.errorprone
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
`java-library`
id("io.sentry.javadoc")
alias(libs.plugins.kotlin.jvm)
jacoco
alias(libs.plugins.errorprone)
alias(libs.plugins.gradle.versions)
alias(libs.plugins.buildconfig)
}

tasks.withType<KotlinCompile>().configureEach {
compilerOptions.jvmTarget = org.jetbrains.kotlin.gradle.dsl.JvmTarget.JVM_1_8
}

dependencies {
api(projects.sentry)
compileOnly(libs.kafka.clients)
compileOnly(libs.jetbrains.annotations)
compileOnly(libs.nopen.annotations)

errorprone(libs.errorprone.core)
errorprone(libs.nopen.checker)
errorprone(libs.nullaway)

// tests
testImplementation(projects.sentryTestSupport)
testImplementation(kotlin(Config.kotlinStdLib))
testImplementation(libs.kotlin.test.junit)
testImplementation(libs.mockito.kotlin)
testImplementation(libs.mockito.inline)
testImplementation(libs.kafka.clients)
}

configure<SourceSetContainer> { test { java.srcDir("src/test/java") } }

jacoco { toolVersion = libs.versions.jacoco.get() }

tasks.jacocoTestReport {
reports {
xml.required.set(true)
html.required.set(false)
}
}

tasks {
jacocoTestCoverageVerification {
violationRules { rule { limit { minimum = Config.QualityPlugins.Jacoco.minimumCoverage } } }
}
check {
dependsOn(jacocoTestCoverageVerification)
dependsOn(jacocoTestReport)
}
}

tasks.withType<JavaCompile>().configureEach {
options.errorprone {
check("NullAway", net.ltgt.gradle.errorprone.CheckSeverity.ERROR)
option("NullAway:AnnotatedPackages", "io.sentry")
}
}

buildConfig {
useJavaOutput()
packageName("io.sentry.kafka")
buildConfigField("String", "SENTRY_KAFKA_SDK_NAME", "\"${Config.Sentry.SENTRY_KAFKA_SDK_NAME}\"")
buildConfigField("String", "VERSION_NAME", "\"${project.version}\"")
}

tasks.jar {
manifest {
attributes(
"Sentry-Version-Name" to project.version,
"Sentry-SDK-Name" to Config.Sentry.SENTRY_KAFKA_SDK_NAME,
"Sentry-SDK-Package-Name" to "maven:io.sentry:sentry-kafka",
"Implementation-Vendor" to "Sentry",
"Implementation-Title" to project.name,
"Implementation-Version" to project.version,
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.sentry.kafka;

import io.sentry.BaggageHeader;
import io.sentry.IScopes;
import io.sentry.ITransaction;
import io.sentry.SentryTraceHeader;
import io.sentry.SpanDataConvention;
import io.sentry.SpanStatus;
import io.sentry.TransactionContext;
import io.sentry.TransactionOptions;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@ApiStatus.Internal
public final class SentryKafkaConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {

public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.consumer";

private final @NotNull IScopes scopes;

public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) {
this.scopes = scopes;
}
Comment on lines +26 to +34
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: SentryKafkaConsumerInterceptor and SentryKafkaProducerInterceptor lack public no-arg constructors, preventing instantiation by Kafka when configured via class name.
Severity: HIGH

Suggested Fix

Add a public, no-argument constructor to both SentryKafkaConsumerInterceptor and SentryKafkaProducerInterceptor. The required IScopes dependency could then be injected via the configure() method, which Kafka calls after instantiation.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location:
sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java#L26-L34

Potential issue: The `SentryKafkaConsumerInterceptor` and
`SentryKafkaProducerInterceptor` classes only have constructors that require an
`IScopes` parameter. However, when these interceptors are configured in a non-Spring
application using the standard Kafka `interceptor.classes` property, Kafka's client
library attempts to instantiate them using reflection, which requires a public
no-argument constructor. The absence of this constructor will cause an
`InstantiationException` at producer or consumer startup, preventing the interceptors
from being used as intended in non-Spring environments.

Did we get this right? 👍 / 👎 to inform future reviews.


@Override
public @NotNull ConsumerRecords<K, V> onConsume(final @NotNull ConsumerRecords<K, V> records) {
if (!scopes.getOptions().isEnableQueueTracing() || records.isEmpty()) {
return records;
}

final @NotNull ConsumerRecord<K, V> firstRecord = records.iterator().next();

try {
final @Nullable TransactionContext continued = continueTrace(firstRecord);
final @NotNull TransactionContext txContext =
continued != null ? continued : new TransactionContext("queue.receive", "queue.receive");
txContext.setName("queue.receive");
txContext.setOperation("queue.receive");

final @NotNull TransactionOptions txOptions = new TransactionOptions();
txOptions.setOrigin(TRACE_ORIGIN);
txOptions.setBindToScope(false);

final @NotNull ITransaction transaction = scopes.startTransaction(txContext, txOptions);
if (!transaction.isNoOp()) {
transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, firstRecord.topic());
transaction.setData("messaging.batch.message.count", records.count());
transaction.setStatus(SpanStatus.OK);
transaction.finish();
Comment on lines +59 to +61
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The transaction in SentryKafkaConsumerInterceptor finishes immediately with a hardcoded OK status and is not bound to the scope, failing to measure actual message processing.
Severity: MEDIUM

Suggested Fix

Instead of creating and finishing a transaction, consider starting a transaction and binding it to the scope. This would allow user code to retrieve the active span and finish it after message processing is complete, accurately capturing its duration and status. Document this pattern for non-Spring users.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location:
sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java#L59-L61

Potential issue: In `SentryKafkaConsumerInterceptor`, a transaction is created and
finished within the `onConsume` method itself. It is explicitly not bound to the scope
(`setBindToScope(false)`) and is always marked with `SpanStatus.OK`. This means the
transaction only measures the interceptor's execution time (microseconds) and does not
capture the duration or result of the actual message processing, providing limited
observability value.

Did we get this right? 👍 / 👎 to inform future reviews.

}
} catch (Throwable ignored) {
// Instrumentation must never break the customer's Kafka poll loop.
}

return records;
}

@Override
public void onCommit(final @NotNull Map<TopicPartition, OffsetAndMetadata> offsets) {}

@Override
public void close() {}

@Override
public void configure(final @Nullable Map<String, ?> configs) {}

private @Nullable TransactionContext continueTrace(final @NotNull ConsumerRecord<K, V> record) {
final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER);
final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER);
final @Nullable List<String> baggageHeaders =
baggage != null ? Collections.singletonList(baggage) : null;
return scopes.continueTrace(sentryTrace, baggageHeaders);
}

private @Nullable String headerValue(
final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) {
final @Nullable Header header = record.headers().lastHeader(headerName);
if (header == null || header.value() == null) {
return null;
}
return new String(header.value(), StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.sentry.spring.jakarta.kafka;
package io.sentry.kafka;

import io.sentry.BaggageHeader;
import io.sentry.DateUtils;
Expand All @@ -19,28 +19,23 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* A Kafka {@link ProducerInterceptor} that creates {@code queue.publish} spans and injects tracing
* headers into outgoing records.
*
* <p>The span starts and finishes synchronously in {@link #onSend(ProducerRecord)}, representing
* "message enqueued" semantics. This avoids cross-thread correlation complexity since {@link
* #onAcknowledgement(RecordMetadata, Exception)} runs on the Kafka I/O thread.
*
* <p>If the customer already has a {@link ProducerInterceptor}, the {@link
* SentryKafkaProducerBeanPostProcessor} composes both using Spring's {@link
* org.springframework.kafka.support.CompositeProducerInterceptor}.
*/
@ApiStatus.Internal
public final class SentryProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
public final class SentryKafkaProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {

static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.producer";
static final String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";
public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer";
public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";

private final @NotNull IScopes scopes;
private final @NotNull String traceOrigin;

public SentryProducerInterceptor(final @NotNull IScopes scopes) {
public SentryKafkaProducerInterceptor(final @NotNull IScopes scopes) {
this(scopes, TRACE_ORIGIN);
}

public SentryKafkaProducerInterceptor(
final @NotNull IScopes scopes, final @NotNull String traceOrigin) {
this.scopes = scopes;
this.traceOrigin = traceOrigin;
}

@Override
Expand All @@ -56,7 +51,7 @@ public SentryProducerInterceptor(final @NotNull IScopes scopes) {

try {
final @NotNull SpanOptions spanOptions = new SpanOptions();
spanOptions.setOrigin(TRACE_ORIGIN);
spanOptions.setOrigin(traceOrigin);
final @NotNull ISpan span =
activeSpan.startChild("queue.publish", record.topic(), spanOptions);
if (span.isNoOp()) {
Expand All @@ -71,7 +66,7 @@ public SentryProducerInterceptor(final @NotNull IScopes scopes) {
span.setStatus(SpanStatus.OK);
span.finish();
} catch (Throwable ignored) {
// Instrumentation must never break the customer's Kafka send
// Instrumentation must never break the customer's Kafka send.
}

return record;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.sentry.kafka

import io.sentry.IScopes
import io.sentry.ITransaction
import io.sentry.SentryOptions
import io.sentry.TransactionContext
import io.sentry.TransactionOptions
import kotlin.test.Test
import kotlin.test.assertSame
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.mockito.kotlin.any
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever

class SentryKafkaConsumerInterceptorTest {

@Test
fun `does nothing when queue tracing is disabled`() {
val scopes = mock<IScopes>()
val options = SentryOptions().apply { isEnableQueueTracing = false }
whenever(scopes.options).thenReturn(options)

val interceptor = SentryKafkaConsumerInterceptor<String, String>(scopes)
val records = singleRecordBatch()

val result = interceptor.onConsume(records)

assertSame(records, result)
verify(scopes, never()).startTransaction(any<TransactionContext>(), any<TransactionOptions>())
}

@Test
fun `starts and finishes queue receive transaction for consumed batch`() {
val scopes = mock<IScopes>()
val options = SentryOptions().apply { isEnableQueueTracing = true }
val transaction = mock<ITransaction>()

whenever(scopes.options).thenReturn(options)
whenever(scopes.continueTrace(any(), any())).thenReturn(null)
whenever(scopes.startTransaction(any<TransactionContext>(), any<TransactionOptions>()))
.thenReturn(transaction)
whenever(transaction.isNoOp).thenReturn(false)

val interceptor = SentryKafkaConsumerInterceptor<String, String>(scopes)

interceptor.onConsume(singleRecordBatch())

verify(scopes).startTransaction(any<TransactionContext>(), any<TransactionOptions>())
verify(transaction).setData("messaging.system", "kafka")
verify(transaction).setData("messaging.destination.name", "my-topic")
verify(transaction).setData("messaging.batch.message.count", 1)
verify(transaction).finish()
}

@Test
fun `commit callback is no-op`() {
val interceptor = SentryKafkaConsumerInterceptor<String, String>(mock())

interceptor.onCommit(mapOf(TopicPartition("my-topic", 0) to OffsetAndMetadata(1)))
}

private fun singleRecordBatch(): ConsumerRecords<String, String> {
val partition = TopicPartition("my-topic", 0)
val record = ConsumerRecord("my-topic", 0, 0L, "key", "value")
return ConsumerRecords(mapOf(partition to listOf(record)))
}
}
Loading
Loading