Skip to content

Commit cdb08c9

Browse files
author
Mateusz Rzeszutek
authored
Instrument reactor-kafka (#8439)
1 parent f003932 commit cdb08c9

File tree

13 files changed

+753
-0
lines changed

13 files changed

+753
-0
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerContextUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,10 @@ public static void set(ConsumerRecords<?, ?> records, Context context, Consumer<
5151
recordsConsumerField.set(records, consumer);
5252
}
5353

54+
public static void copy(ConsumerRecord<?, ?> from, ConsumerRecord<?, ?> to) {
55+
recordContextField.set(to, recordContextField.get(from));
56+
recordConsumerField.set(to, recordConsumerField.get(from));
57+
}
58+
5459
private KafkaConsumerContextUtil() {}
5560
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
plugins {
2+
id("otel.javaagent-instrumentation")
3+
}
4+
5+
muzzle {
6+
pass {
7+
group.set("io.projectreactor.kafka")
8+
module.set("reactor-kafka")
9+
// TODO: add support for 1.3
10+
versions.set("[1.0.0,1.3.0)")
11+
}
12+
}
13+
14+
dependencies {
15+
compileOnly("com.google.auto.value:auto-value-annotations")
16+
annotationProcessor("com.google.auto.value:auto-value")
17+
18+
bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap"))
19+
20+
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))
21+
implementation(project(":instrumentation:reactor:reactor-3.1:library"))
22+
23+
library("io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE")
24+
25+
testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent"))
26+
testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent"))
27+
28+
testImplementation("org.testcontainers:kafka")
29+
30+
testLibrary("io.projectreactor:reactor-test:3.1.0.RELEASE")
31+
32+
latestDepTestLibrary("io.projectreactor:reactor-core:3.4.+")
33+
// TODO: add support for 1.3
34+
latestDepTestLibrary("io.projectreactor.kafka:reactor-kafka:1.2.+")
35+
}
36+
37+
tasks {
38+
test {
39+
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
40+
41+
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
42+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
43+
}
44+
45+
check {
46+
dependsOn(testing.suites)
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.named;
9+
import static net.bytebuddy.matcher.ElementMatchers.returns;
10+
11+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
13+
import net.bytebuddy.asm.Advice;
14+
import net.bytebuddy.description.type.TypeDescription;
15+
import net.bytebuddy.matcher.ElementMatcher;
16+
import reactor.core.publisher.Flux;
17+
18+
// handles versions 1.0.0 - 1.2.+
19+
public class DefaultKafkaReceiverInstrumentation implements TypeInstrumentation {
20+
21+
@Override
22+
public ElementMatcher<TypeDescription> typeMatcher() {
23+
return named("reactor.kafka.receiver.internals.DefaultKafkaReceiver");
24+
}
25+
26+
@Override
27+
public void transform(TypeTransformer transformer) {
28+
transformer.applyAdviceToMethod(
29+
named("createConsumerFlux").and(returns(named("reactor.core.publisher.Flux"))),
30+
this.getClass().getName() + "$CreateConsumerFluxAdvice");
31+
}
32+
33+
@SuppressWarnings("unused")
34+
public static class CreateConsumerFluxAdvice {
35+
36+
@Advice.OnMethodExit(suppress = Throwable.class)
37+
public static void onExit(@Advice.Return(readOnly = false) Flux<?> flux) {
38+
if (!(flux instanceof TracingDisablingKafkaFlux)) {
39+
flux = new TracingDisablingKafkaFlux<>(flux);
40+
}
41+
}
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
7+
8+
import static io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0.ReactorKafkaSingletons.processInstrumenter;
9+
10+
import io.opentelemetry.context.Context;
11+
import io.opentelemetry.context.Scope;
12+
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext;
13+
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil;
14+
import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest;
15+
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator;
16+
import org.apache.kafka.clients.consumer.ConsumerRecord;
17+
import org.reactivestreams.Subscription;
18+
import reactor.core.CoreSubscriber;
19+
import reactor.core.Scannable;
20+
import reactor.core.publisher.Flux;
21+
import reactor.core.publisher.FluxOperator;
22+
import reactor.core.publisher.Operators;
23+
24+
final class InstrumentedKafkaFlux<R extends ConsumerRecord<?, ?>> extends FluxOperator<R, R> {
25+
26+
InstrumentedKafkaFlux(Flux<R> source) {
27+
super(source);
28+
}
29+
30+
@Override
31+
@SuppressWarnings("unchecked")
32+
public void subscribe(CoreSubscriber<? super R> actual) {
33+
source.subscribe(new InstrumentedSubscriber((CoreSubscriber<ConsumerRecord<?, ?>>) actual));
34+
}
35+
36+
static final class InstrumentedSubscriber
37+
implements CoreSubscriber<ConsumerRecord<?, ?>>, Subscription, Scannable {
38+
39+
private final CoreSubscriber<ConsumerRecord<?, ?>> actual;
40+
private final Context currentContext;
41+
private Subscription subscription;
42+
43+
InstrumentedSubscriber(CoreSubscriber<ConsumerRecord<?, ?>> actual) {
44+
this.actual = actual;
45+
currentContext =
46+
ContextPropagationOperator.getOpenTelemetryContext(
47+
actual.currentContext(), Context.current());
48+
}
49+
50+
@Override
51+
public void onSubscribe(Subscription s) {
52+
if (Operators.validate(this.subscription, s)) {
53+
this.subscription = s;
54+
55+
actual.onSubscribe(this);
56+
}
57+
}
58+
59+
@Override
60+
public reactor.util.context.Context currentContext() {
61+
return actual.currentContext();
62+
}
63+
64+
@Override
65+
public void onNext(ConsumerRecord<?, ?> record) {
66+
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(record);
67+
Context receiveContext = consumerContext.getContext();
68+
// use the receive CONSUMER span as parent if it's available
69+
Context parentContext = receiveContext != null ? receiveContext : currentContext;
70+
71+
KafkaProcessRequest request = KafkaProcessRequest.create(consumerContext, record);
72+
if (!processInstrumenter().shouldStart(parentContext, request)) {
73+
actual.onNext(record);
74+
return;
75+
}
76+
77+
Context context = processInstrumenter().start(parentContext, request);
78+
Throwable error = null;
79+
try (Scope ignored = context.makeCurrent()) {
80+
actual.onNext(record);
81+
} catch (Throwable t) {
82+
error = t;
83+
throw t;
84+
} finally {
85+
processInstrumenter().end(context, request, null, error);
86+
}
87+
}
88+
89+
@Override
90+
public void onError(Throwable throwable) {
91+
try (Scope ignored = currentContext.makeCurrent()) {
92+
actual.onError(throwable);
93+
}
94+
}
95+
96+
@Override
97+
public void onComplete() {
98+
try (Scope ignored = currentContext.makeCurrent()) {
99+
actual.onComplete();
100+
}
101+
}
102+
103+
@Override
104+
public void request(long l) {
105+
subscription.request(l);
106+
}
107+
108+
@Override
109+
public void cancel() {
110+
subscription.cancel();
111+
}
112+
113+
@SuppressWarnings("rawtypes") // that's how the method is defined
114+
@Override
115+
public Object scanUnsafe(Attr key) {
116+
if (key == Attr.ACTUAL) {
117+
return actual;
118+
}
119+
if (key == Attr.PARENT) {
120+
return subscription;
121+
}
122+
return null;
123+
}
124+
}
125+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
7+
8+
import java.util.function.Function;
9+
import org.apache.kafka.clients.consumer.Consumer;
10+
import org.apache.kafka.clients.consumer.ConsumerRecord;
11+
import reactor.core.publisher.Flux;
12+
import reactor.core.publisher.Mono;
13+
import reactor.kafka.receiver.KafkaReceiver;
14+
import reactor.kafka.receiver.ReceiverRecord;
15+
import reactor.kafka.sender.TransactionManager;
16+
17+
public final class InstrumentedKafkaReceiver<K, V> implements KafkaReceiver<K, V> {
18+
19+
private final KafkaReceiver<K, V> actual;
20+
21+
public InstrumentedKafkaReceiver(KafkaReceiver<K, V> actual) {
22+
this.actual = actual;
23+
}
24+
25+
@Override
26+
public Flux<ReceiverRecord<K, V>> receive() {
27+
return new InstrumentedKafkaFlux<>(actual.receive());
28+
}
29+
30+
@Override
31+
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
32+
return actual.receiveAutoAck().map(InstrumentedKafkaFlux::new);
33+
}
34+
35+
@Override
36+
public Flux<ConsumerRecord<K, V>> receiveAtmostOnce() {
37+
return new InstrumentedKafkaFlux<>(actual.receiveAtmostOnce());
38+
}
39+
40+
@Override
41+
public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(
42+
TransactionManager transactionManager) {
43+
return actual.receiveAutoAck().map(InstrumentedKafkaFlux::new);
44+
}
45+
46+
@Override
47+
public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
48+
return actual.doOnConsumer(function);
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
9+
import static net.bytebuddy.matcher.ElementMatchers.named;
10+
import static net.bytebuddy.matcher.ElementMatchers.returns;
11+
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
14+
import net.bytebuddy.asm.Advice;
15+
import net.bytebuddy.description.type.TypeDescription;
16+
import net.bytebuddy.matcher.ElementMatcher;
17+
import reactor.kafka.receiver.KafkaReceiver;
18+
19+
public class KafkaReceiverInstrumentation implements TypeInstrumentation {
20+
21+
@Override
22+
public ElementMatcher<TypeDescription> typeMatcher() {
23+
return named("reactor.kafka.receiver.KafkaReceiver");
24+
}
25+
26+
@Override
27+
public void transform(TypeTransformer transformer) {
28+
transformer.applyAdviceToMethod(
29+
named("create").and(isStatic()).and(returns(named("reactor.kafka.receiver.KafkaReceiver"))),
30+
this.getClass().getName() + "$CreateAdvice");
31+
}
32+
33+
@SuppressWarnings("unused")
34+
public static class CreateAdvice {
35+
36+
@Advice.OnMethodExit(suppress = Throwable.class)
37+
public static void onExit(@Advice.Return(readOnly = false) KafkaReceiver<?, ?> receiver) {
38+
if (!(receiver instanceof InstrumentedKafkaReceiver)) {
39+
receiver = new InstrumentedKafkaReceiver<>(receiver);
40+
}
41+
}
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
7+
8+
import static java.util.Arrays.asList;
9+
10+
import com.google.auto.service.AutoService;
11+
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
13+
import java.util.List;
14+
15+
@AutoService(InstrumentationModule.class)
16+
public class ReactorKafkaInstrumentationModule extends InstrumentationModule {
17+
18+
public ReactorKafkaInstrumentationModule() {
19+
super("reactor-kafka", "reactor-kafka-1.0");
20+
}
21+
22+
@Override
23+
public List<TypeInstrumentation> typeInstrumentations() {
24+
return asList(
25+
new KafkaReceiverInstrumentation(),
26+
new ReceiverRecordInstrumentation(),
27+
new DefaultKafkaReceiverInstrumentation());
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
7+
8+
import io.opentelemetry.api.GlobalOpenTelemetry;
9+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
10+
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
11+
import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest;
12+
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
13+
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
14+
15+
final class ReactorKafkaSingletons {
16+
17+
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.reactor-kafka-1.0";
18+
19+
private static final Instrumenter<KafkaProcessRequest, Void> PROCESS_INSTRUMENTER =
20+
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
21+
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
22+
.setCaptureExperimentalSpanAttributes(
23+
InstrumentationConfig.get()
24+
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
25+
.setMessagingReceiveInstrumentationEnabled(
26+
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
27+
.createConsumerProcessInstrumenter();
28+
29+
public static Instrumenter<KafkaProcessRequest, Void> processInstrumenter() {
30+
return PROCESS_INSTRUMENTER;
31+
}
32+
33+
private ReactorKafkaSingletons() {}
34+
}

0 commit comments

Comments
 (0)