in flume-legacy-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java [124:147]
public void append( AvroFlumeOGEvent evt ) {
counterGroup.incrementAndGet("rpc.received");
Map<String, String> headers = new HashMap<String, String>();
// extract Flume OG event headers
headers.put(HOST, evt.getHost().toString());
headers.put(TIMESTAMP, Long.toString(evt.getTimestamp()));
headers.put(PRIORITY, evt.getPriority().toString());
headers.put(NANOS, Long.toString(evt.getNanos()));
for (Entry<CharSequence, ByteBuffer> entry : evt.getFields().entrySet()) {
headers.put(entry.getKey().toString(), entry.getValue().toString());
}
headers.put(OG_EVENT, "yes");
Event event = EventBuilder.withBody(evt.getBody().array(), headers);
try {
getChannelProcessor().processEvent(event);
counterGroup.incrementAndGet("rpc.events");
} catch (ChannelException ex) {
return;
}
counterGroup.incrementAndGet("rpc.successful");
}