in apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java [59:112]
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
Message message = (Message) allArguments[0];
ClientImpl producerImpl = (ClientImpl) objInst;
ContextCarrier contextCarrier = new ContextCarrier();
String namingServiceAddress = producerImpl.getClientConfiguration().getEndpoints();
AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress);
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
Tags.MQ_BROKER.set(span, namingServiceAddress);
Tags.MQ_TOPIC.set(span, message.getTopic());
if (RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_KEYS) {
Collection<String> keys = message.getKeys();
if (!CollectionUtil.isEmpty(keys)) {
span.tag(MQ_MESSAGE_KEYS, keys.stream().collect(Collectors.joining(",")));
}
}
if (RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_TAGS) {
Optional<String> tag = message.getTag();
if (tag.isPresent()) {
span.tag(MQ_MESSAGE_TAGS, tag.get());
}
}
contextCarrier.extensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(span);
Map<String, String> properties = message.getProperties();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
if (!StringUtil.isEmpty(next.getHeadValue())) {
properties.put(next.getHeadKey(), next.getHeadValue());
}
}
MessageBuilder messageBuilder = new MessageBuilderImpl();
messageBuilder.setTopic(message.getTopic());
if (message.getTag().isPresent()) {
messageBuilder.setTag(message.getTag().get());
}
messageBuilder.setKeys(message.getKeys().toArray(new String[0]));
if (message.getMessageGroup().isPresent()) {
messageBuilder.setMessageGroup(message.getMessageGroup().get());
}
byte[] body = new byte[message.getBody().limit()];
message.getBody().get(body);
messageBuilder.setBody(body);
if (message.getDeliveryTimestamp().isPresent()) {
messageBuilder.setDeliveryTimestamp(message.getDeliveryTimestamp().get());
}
properties.entrySet().forEach(item -> messageBuilder.addProperty(item.getKey(), item.getValue()));
allArguments[0] = messageBuilder.build();
}