in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [554:571]
private Event deserializeValue(byte[] value, boolean parseAsFlumeEvent) throws IOException {
Event e;
if (parseAsFlumeEvent) {
ByteArrayInputStream in =
new ByteArrayInputStream(value);
decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
if (!reader.isPresent()) {
reader = Optional.of(
new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
}
AvroFlumeEvent event = reader.get().read(null, decoder);
e = EventBuilder.withBody(event.getBody().array(),
toStringMap(event.getHeaders()));
} else {
e = EventBuilder.withBody(value, Collections.EMPTY_MAP);
}
return e;
}