public ConsumerRecord next()

in apm-agent-plugins/apm-kafka-plugin/apm-kafka-headers-plugin/src/main/java/co/elastic/apm/agent/kafka/helper/ConsumerRecordsIteratorWrapper.java [71:108]


    public ConsumerRecord<?, ?> next() {
        endCurrentTransaction();
        ConsumerRecord<?, ?> record = delegate.next();
        try {
            String topic = record.topic();
            if (!WildcardMatcher.isAnyMatch(messagingConfiguration.getIgnoreMessageQueues(), topic)) {
                Transaction<?> transaction = tracer.startChildTransaction(record, KafkaRecordHeaderAccessor.instance(), PrivilegedActionUtils.getClassLoader(ConsumerRecordsIteratorWrapper.class));
                if (transaction != null) {
                    transaction.withType("messaging").withName("Kafka record from " + topic).activate();
                    transaction.setFrameworkName(FRAMEWORK_NAME);

                    Message message = transaction.getContext().getMessage();
                    message.withQueue(topic);
                    if (record.timestampType() == TimestampType.CREATE_TIME) {
                        message.withAge(System.currentTimeMillis() - record.timestamp());
                    }

                    if (transaction.isSampled() && coreConfiguration.isCaptureHeaders()) {
                        for (Header header : record.headers()) {
                            String key = header.key();
                            if (!tracer.getTraceHeaderNames().contains(key) &&
                                WildcardMatcher.anyMatch(coreConfiguration.getSanitizeFieldNames(), key) == null) {
                                message.addHeader(key, header.value());
                            }
                        }
                    }

                    if (transaction.isSampled() && coreConfiguration.getCaptureBody() != CoreConfiguration.EventType.OFF) {
                        message.appendToBody("key=").appendToBody(String.valueOf(record.key())).appendToBody("; ")
                            .appendToBody("value=").appendToBody(String.valueOf(record.value()));
                    }
                }
            }
        } catch (Exception e) {
            logger.error("Error in transaction creation based on Kafka record", e);
        }
        return record;
    }