in flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java [209:352]
protected Status doProcess() throws EventDeliveryException {
final String batchUUID = UUID.randomUUID().toString();
String kafkaKey;
Event event;
byte[] eventBody;
try {
// prepare time variables for new batch
final long nanoBatchStartTime = System.nanoTime();
final long batchStartTime = System.currentTimeMillis();
final long maxBatchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
while (eventList.size() < batchUpperLimit &&
System.currentTimeMillis() < maxBatchEndTime) {
if (it == null || !it.hasNext()) {
// Obtaining new records
// Poll time is remainder time for current batch.
long durMs = Math.max(0L, maxBatchEndTime - System.currentTimeMillis());
Duration duration = Duration.ofMillis(durMs);
ConsumerRecords<String, byte[]> records = consumer.poll(duration);
it = records.iterator();
// this flag is set to true in a callback when some partitions are revoked.
// If there are any records we commit them.
if (rebalanceFlag.compareAndSet(true, false)) {
break;
}
// check records after poll
if (!it.hasNext()) {
counter.incrementKafkaEmptyCount();
log.debug("Returning with backoff. No more data to read");
// batch time exceeded
break;
}
}
// get next message
ConsumerRecord<String, byte[]> message = it.next();
kafkaKey = message.key();
if (useAvroEventFormat) {
//Assume the event is in Avro format using the AvroFlumeEvent schema
//Will need to catch the exception if it is not
ByteArrayInputStream in =
new ByteArrayInputStream(message.value());
decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
if (!reader.isPresent()) {
reader = Optional.of(
new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
}
//This may throw an exception but it will be caught by the
//exception handler below and logged at error
AvroFlumeEvent avroevent = reader.get().read(null, decoder);
eventBody = avroevent.getBody().array();
headers = toStringMap(avroevent.getHeaders());
} else {
eventBody = message.value();
headers.clear();
headers = new HashMap<String, String>(4);
}
// Add headers to event (timestamp, topic, partition, key) only if they don't exist
if (!headers.containsKey(TIMESTAMP_HEADER)) {
headers.put(TIMESTAMP_HEADER, String.valueOf(message.timestamp()));
}
if (!headerMap.isEmpty()) {
Headers kafkaHeaders = message.headers();
for (Map.Entry<String, String> entry : headerMap.entrySet()) {
for (Header kafkaHeader : kafkaHeaders.headers(entry.getValue())) {
headers.put(entry.getKey(), new String(kafkaHeader.value()));
}
}
}
// Only set the topic header if setTopicHeader and it isn't already populated
if (setTopicHeader && !headers.containsKey(topicHeader)) {
headers.put(topicHeader, message.topic());
}
if (!headers.containsKey(PARTITION_HEADER)) {
headers.put(PARTITION_HEADER, String.valueOf(message.partition()));
}
if (!headers.containsKey(OFFSET_HEADER)) {
headers.put(OFFSET_HEADER, String.valueOf(message.offset()));
}
if (kafkaKey != null) {
headers.put(KEY_HEADER, kafkaKey);
}
if (log.isTraceEnabled()) {
if (LogPrivacyUtil.allowLogRawData()) {
log.trace("Topic: {} Partition: {} Message: {}", new String[]{
message.topic(),
String.valueOf(message.partition()),
new String(eventBody)
});
} else {
log.trace("Topic: {} Partition: {} Message arrived.",
message.topic(),
String.valueOf(message.partition()));
}
}
event = EventBuilder.withBody(eventBody, headers);
eventList.add(event);
if (log.isDebugEnabled()) {
log.debug("Waited: {} ", System.currentTimeMillis() - batchStartTime);
log.debug("Event #: {}", eventList.size());
}
// For each partition store next offset that is going to be read.
tpAndOffsetMetadata.put(new TopicPartition(message.topic(), message.partition()),
new OffsetAndMetadata(message.offset() + 1, batchUUID));
}
if (eventList.size() > 0) {
counter.addToKafkaEventGetTimer((System.nanoTime() - nanoBatchStartTime) / (1000 * 1000));
counter.addToEventReceivedCount((long) eventList.size());
getChannelProcessor().processEventBatch(eventList);
counter.addToEventAcceptedCount(eventList.size());
if (log.isDebugEnabled()) {
log.debug("Wrote {} events to channel", eventList.size());
}
eventList.clear();
if (!tpAndOffsetMetadata.isEmpty()) {
long commitStartTime = System.nanoTime();
consumer.commitSync(tpAndOffsetMetadata);
long commitEndTime = System.nanoTime();
counter.addToKafkaCommitTimer((commitEndTime - commitStartTime) / (1000 * 1000));
tpAndOffsetMetadata.clear();
}
return Status.READY;
}
return Status.BACKOFF;
} catch (Exception e) {
log.error("KafkaSource EXCEPTION, {}", e);
counter.incrementEventReadOrChannelFail(e);
return Status.BACKOFF;
}
}