in apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java [53:95]
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
Message message = (Message) allArguments[2];
ContextCarrier contextCarrier = new ContextCarrier();
String namingServiceAddress = String.valueOf(objInst.getSkyWalkingDynamicField());
AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress);
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
Tags.MQ_BROKER.set(span, (String) allArguments[0]);
Tags.MQ_TOPIC.set(span, message.getTopic());
String keys = message.getKeys();
if (StringUtil.isNotBlank(keys)) {
span.tag(Tags.ofKey("mq.message.keys"), keys);
}
String tags = message.getTags();
if (StringUtil.isNotBlank(tags)) {
span.tag(Tags.ofKey("mq.message.tags"), tags);
}
contextCarrier.extensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(span);
SendMessageRequestHeader requestHeader = (SendMessageRequestHeader) allArguments[3];
StringBuilder properties = new StringBuilder(requestHeader.getProperties());
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
if (!StringUtil.isEmpty(next.getHeadValue())) {
if (properties.length() > 0 && properties.charAt(properties.length() - 1) != PROPERTY_SEPARATOR) {
// adapt for RocketMQ 4.9.x or later
properties.append(PROPERTY_SEPARATOR);
}
properties.append(next.getHeadKey());
properties.append(NAME_VALUE_SEPARATOR);
properties.append(next.getHeadValue());
}
}
requestHeader.setProperties(properties.toString());
if (allArguments[6] != null) {
((EnhancedInstance) allArguments[6]).setSkyWalkingDynamicField(new SendCallBackEnhanceInfo(message.getTopic(), ContextManager
.capture()));
}
}