public void beforeMethod()

in apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java [43:88]


    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {

        ContextCarrier contextCarrier = new ContextCarrier();

        ProducerRecord record = (ProducerRecord) allArguments[0];
        String topicName = record.topic();
        AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String) objInst
                .getSkyWalkingDynamicField());

        Tags.MQ_BROKER.set(activeSpan, (String) objInst.getSkyWalkingDynamicField());
        Tags.MQ_TOPIC.set(activeSpan, topicName);
        contextCarrier.extensionInjector().injectSendingTimestamp();
        SpanLayer.asMQ(activeSpan);
        activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);

        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes(StandardCharsets.UTF_8));
        }

        //when use lambda expression, not to generate inner class,
        //    and not to trigger kafka CallBack class define, so allArguments[1] can't to cast EnhancedInstance
        Object shouldCallbackInstance = allArguments[1];
        if (null != shouldCallbackInstance) {
            if (shouldCallbackInstance instanceof EnhancedInstance) {
                EnhancedInstance callbackInstance = (EnhancedInstance) shouldCallbackInstance;
                ContextSnapshot snapshot = ContextManager.capture();
                if (null != snapshot) {
                    CallbackCache cache = new CallbackCache();
                    cache.setSnapshot(snapshot);
                    callbackInstance.setSkyWalkingDynamicField(cache);
                }
            } else if (shouldCallbackInstance instanceof Callback) {
                Callback callback = (Callback) shouldCallbackInstance;
                ContextSnapshot snapshot = ContextManager.capture();
                if (null != snapshot) {
                    CallbackCache cache = new CallbackCache();
                    cache.setSnapshot(snapshot);
                    cache.setCallback(callback);
                    allArguments[1] = new CallbackAdapterInterceptor(cache);
                }
            }
        }
    }