in tools/src/main/java/org/apache/kafka/tools/consumer/DefaultMessageFormatter.java [129:193]
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
try {
if (printTimestamp) {
if (consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
output.print(consumerRecord.timestampType() + ":" + consumerRecord.timestamp());
} else {
output.print("NO_TIMESTAMP");
}
writeSeparator(output, printPartition || printOffset || printDelivery || printEpoch || printHeaders || printKey || printValue);
}
if (printPartition) {
output.print("Partition:");
output.print(consumerRecord.partition());
writeSeparator(output, printOffset || printDelivery || printEpoch || printHeaders || printKey || printValue);
}
if (printOffset) {
output.print("Offset:");
output.print(consumerRecord.offset());
writeSeparator(output, printDelivery || printEpoch || printHeaders || printKey || printValue);
}
if (printDelivery) {
output.print("Delivery:");
output.print(consumerRecord.deliveryCount().map(delivery -> Short.toString(delivery)).orElse("NOT_PRESENT"));
writeSeparator(output, printEpoch || printHeaders || printKey || printValue);
}
if (printEpoch) {
output.print("Epoch:");
output.print(consumerRecord.leaderEpoch().map(epoch -> Integer.toString(epoch)).orElse("NOT_PRESENT"));
writeSeparator(output, printHeaders || printKey || printValue);
}
if (printHeaders) {
Iterator<Header> headersIt = consumerRecord.headers().iterator();
if (!headersIt.hasNext()) {
output.print("NO_HEADERS");
} else {
while (headersIt.hasNext()) {
Header header = headersIt.next();
output.print(header.key() + ":");
output.write(deserialize(consumerRecord, headersDeserializer, header.value(), consumerRecord.topic()));
if (headersIt.hasNext()) {
output.write(headersSeparator);
}
}
}
writeSeparator(output, printKey || printValue);
}
if (printKey) {
output.write(deserialize(consumerRecord, keyDeserializer, consumerRecord.key(), consumerRecord.topic()));
writeSeparator(output, printValue);
}
if (printValue) {
output.write(deserialize(consumerRecord, valueDeserializer, consumerRecord.value(), consumerRecord.topic()));
output.write(lineSeparator);
}
} catch (IOException ioe) {
LOG.error("Unable to write the consumer record to the output", ioe);
}
}