in apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplAsyncInterceptor.java [56:99]
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
CompletableFuture<List<MessageView>> futureList = (CompletableFuture<List<MessageView>>) ret;
ContextSnapshot capture = null;
if (ContextManager.isActive()) {
capture = ContextManager.capture();
}
final ContextSnapshot finalCapture = capture;
return futureList.whenCompleteAsync((messages, throwable) -> {
String topics = messages.stream().map(MessageView::getTopic).distinct().collect(Collectors.joining(","));
AbstractSpan span = ContextManager.createEntrySpan(
CONSUMER_OPERATION_NAME_PREFIX + topics + "/Consumer", null);
if (finalCapture != null) {
ContextManager.continued(finalCapture);
}
if (null != throwable) {
span.log(throwable);
span.errorOccurred();
ContextManager.stopSpan();
return;
}
if (messages.isEmpty()) {
ContextManager.stopSpan();
return;
}
String namesrvAddr = "";
Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
if (skyWalkingDynamicField != null) {
ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos) objInst.getSkyWalkingDynamicField();
namesrvAddr = consumerEnhanceInfos.getNamesrvAddr();
}
SpanLayer.asMQ(span);
Tags.MQ_BROKER.set(span, namesrvAddr);
Tags.MQ_TOPIC.set(span, topics);
span.setPeer(namesrvAddr);
span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
for (MessageView messageView : messages) {
ContextCarrier contextCarrier = getContextCarrierFromMessage(messageView);
ContextManager.extract(contextCarrier);
}
ContextManager.stopSpan();
});
}