in apm-agent-plugins/apm-kafka-plugin/apm-kafka-headers-plugin/src/main/java/co/elastic/apm/agent/kafka/helper/ConsumerRecordsIteratorWrapper.java [71:108]
public ConsumerRecord<?, ?> next() {
endCurrentTransaction();
ConsumerRecord<?, ?> record = delegate.next();
try {
String topic = record.topic();
if (!WildcardMatcher.isAnyMatch(messagingConfiguration.getIgnoreMessageQueues(), topic)) {
Transaction<?> transaction = tracer.startChildTransaction(record, KafkaRecordHeaderAccessor.instance(), PrivilegedActionUtils.getClassLoader(ConsumerRecordsIteratorWrapper.class));
if (transaction != null) {
transaction.withType("messaging").withName("Kafka record from " + topic).activate();
transaction.setFrameworkName(FRAMEWORK_NAME);
Message message = transaction.getContext().getMessage();
message.withQueue(topic);
if (record.timestampType() == TimestampType.CREATE_TIME) {
message.withAge(System.currentTimeMillis() - record.timestamp());
}
if (transaction.isSampled() && coreConfiguration.isCaptureHeaders()) {
for (Header header : record.headers()) {
String key = header.key();
if (!tracer.getTraceHeaderNames().contains(key) &&
WildcardMatcher.anyMatch(coreConfiguration.getSanitizeFieldNames(), key) == null) {
message.addHeader(key, header.value());
}
}
}
if (transaction.isSampled() && coreConfiguration.getCaptureBody() != CoreConfiguration.EventType.OFF) {
message.appendToBody("key=").appendToBody(String.valueOf(record.key())).appendToBody("; ")
.appendToBody("value=").appendToBody(String.valueOf(record.value()));
}
}
}
} catch (Exception e) {
logger.error("Error in transaction creation based on Kafka record", e);
}
return record;
}