Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,11 +421,20 @@ public void testMultiTopicConsumerTracing() throws Exception {

// Verify spans for both topics
List<SpanData> spans = spanExporter.getFinishedSpanItems();
long consumerSpans = spans.stream()
List<SpanData> consumerSpans = spans.stream()
.filter(s -> s.getKind() == SpanKind.CONSUMER)
.count();

assertEquals(consumerSpans, 2);
.collect(java.util.stream.Collectors.toList());
Set<String> consumerSpanTopics = consumerSpans.stream()
.map(s -> s.getAttributes().get(
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.name")))
.collect(java.util.stream.Collectors.toSet());
Set<String> consumerSpanNames = consumerSpans.stream()
.map(SpanData::getName)
.collect(java.util.stream.Collectors.toSet());

assertEquals(consumerSpans.size(), 2);
assertEquals(consumerSpanTopics, Set.of(topic1, topic2));
assertEquals(consumerSpanNames, Set.of("process " + topic1, "process " + topic2));

producer1.close();
producer2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ public class OpenTelemetryConsumerInterceptor<T> implements ConsumerInterceptor<

private final Tracer tracer;
private final TextMapPropagator propagator;
private String topic;
/**
* Topic returned by the consumer. Used as a fallback when a message has no topic name,
* and as the topic key for acknowledgment callbacks that only carry a MessageId.
*/
private String consumerTopic;
private String subscription;

/**
Expand Down Expand Up @@ -92,7 +96,15 @@ private String getTopicKey(MessageId messageId) {
if (messageId instanceof TopicMessageId) {
return ((TopicMessageId) messageId).getOwnerTopic();
}
return topic != null ? topic : "";
return consumerTopic != null ? consumerTopic : "";
}

private String getMessageTopic(Message<T> message) {
String messageTopic = message.getTopicName();
if (messageTopic != null && !messageTopic.isEmpty()) {
return messageTopic;
}
return consumerTopic != null ? consumerTopic : "";
}

@Override
Expand All @@ -111,15 +123,16 @@ public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
}

try {
if (topic == null) {
topic = consumer.getTopic();
if (consumerTopic == null) {
consumerTopic = consumer.getTopic();
}
if (subscription == null) {
subscription = consumer.getSubscription();
}
String messageTopic = getMessageTopic(message);

// Create a consumer span for this message
Span span = TracingContext.createConsumerSpan(tracer, topic, subscription, message, propagator);
Span span = TracingContext.createConsumerSpan(tracer, messageTopic, subscription, message, propagator);

if (TracingContext.isValid(span)) {
MessageId messageId = message.getMessageId();
Expand All @@ -137,7 +150,7 @@ public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
}

log.debug().attr("messageId", messageId)
.attr("topic", topic)
.attr("topic", messageTopic)
.log("Created consumer span");
}
} catch (Exception e) {
Expand Down Expand Up @@ -334,4 +347,4 @@ public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> messageIds) {
}
}
}
}
}