in flume-legacy-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java [75:102]
public void append(ThriftFlumeEvent evt ) {
if (evt == null) {
return;
}
Map<String, String> headers = new HashMap<String, String>();
// extract Flume event headers
headers.put(HOST, evt.getHost());
headers.put(TIMESTAMP, Long.toString(evt.getTimestamp()));
headers.put(PRIORITY, evt.getPriority().toString());
headers.put(NANOS, Long.toString(evt.getNanos()));
for (Entry<String, ByteBuffer> entry: evt.getFields().entrySet()) {
headers.put(entry.getKey().toString(),
UTF_8.decode(entry.getValue()).toString());
}
headers.put(OG_EVENT, "yes");
Event event = EventBuilder.withBody(evt.getBody(), headers);
counterGroup.incrementAndGet("rpc.events");
try {
getChannelProcessor().processEvent(event);
} catch (ChannelException ex) {
LOG.warn("Failed to process event", ex);
return;
}
counterGroup.incrementAndGet("rpc.successful");
}