diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java index 835fb5b4ecf35..edf036a0f92fc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java @@ -40,6 +40,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -161,6 +162,52 @@ public void testBasicProducerConsumerTracing() throws Exception { "acknowledge"); } + @Test + public void testPartitionedProducerSpanUsesCurrentProducerTopic() throws Exception { + String topic = "persistent://prop/ns-abc/test-partitioned-producer-tracing"; + String partition0 = topic + "-partition-0"; + String partition1 = topic + "-partition-1"; + spanExporter.reset(); + + admin.topics().createPartitionedTopic(topic, 2); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); + + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .enableBatching(false) + .create(); + + producer.send("message-0"); + producer.send("message-1"); + + producer.close(); + client.close(); + + flushSpans(); + + List spans = spanExporter.getFinishedSpanItems(); + List producerSpans = spans.stream() + .filter(s -> s.getKind() == SpanKind.PRODUCER) + .collect(java.util.stream.Collectors.toList()); + Set producerSpanTopics = producerSpans.stream() + .map(s -> s.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.name"))) + .collect(java.util.stream.Collectors.toSet()); + Set producerSpanNames = producerSpans.stream() + .map(SpanData::getName) + .collect(java.util.stream.Collectors.toSet()); + + assertEquals(producerSpans.size(), 2); + assertEquals(producerSpanTopics, Set.of(partition0, partition1)); + assertEquals(producerSpanNames, Set.of("send " + partition0, "send " + partition1)); + } + @Test public void testContextPropagationViaMessageProperties() throws Exception { String topic = "persistent://prop/ns-abc/test-context-propagation"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java index 3b092dd808ba3..b6204eedb5f93 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java @@ -45,7 +45,6 @@ public class OpenTelemetryProducerInterceptor implements ProducerInterceptor { private final Tracer tracer; private final TextMapPropagator propagator; - private String topic; public OpenTelemetryProducerInterceptor(InstrumentProvider instrumentProvider) { this.tracer = instrumentProvider.getTracer(); @@ -70,9 +69,7 @@ public Message beforeSend(Producer producer, Message message) { } try { - if (topic == null) { - topic = producer.getTopic(); - } + String topic = producer.getTopic(); // Create a span for this message publication // The span will be linked to the current context, which may have been set by: @@ -121,4 +118,4 @@ public void onSendAcknowledgement(Producer producer, Message message, Mess } } } -} \ No newline at end of file +}