in apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java [43:88]
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ContextCarrier contextCarrier = new ContextCarrier();
ProducerRecord record = (ProducerRecord) allArguments[0];
String topicName = record.topic();
AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String) objInst
.getSkyWalkingDynamicField());
Tags.MQ_BROKER.set(activeSpan, (String) objInst.getSkyWalkingDynamicField());
Tags.MQ_TOPIC.set(activeSpan, topicName);
contextCarrier.extensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes(StandardCharsets.UTF_8));
}
//when use lambda expression, not to generate inner class,
// and not to trigger kafka CallBack class define, so allArguments[1] can't to cast EnhancedInstance
Object shouldCallbackInstance = allArguments[1];
if (null != shouldCallbackInstance) {
if (shouldCallbackInstance instanceof EnhancedInstance) {
EnhancedInstance callbackInstance = (EnhancedInstance) shouldCallbackInstance;
ContextSnapshot snapshot = ContextManager.capture();
if (null != snapshot) {
CallbackCache cache = new CallbackCache();
cache.setSnapshot(snapshot);
callbackInstance.setSkyWalkingDynamicField(cache);
}
} else if (shouldCallbackInstance instanceof Callback) {
Callback callback = (Callback) shouldCallbackInstance;
ContextSnapshot snapshot = ContextManager.capture();
if (null != snapshot) {
CallbackCache cache = new CallbackCache();
cache.setSnapshot(snapshot);
cache.setCallback(callback);
allArguments[1] = new CallbackAdapterInterceptor(cache);
}
}
}
}