in flume-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java [139:308]
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = null;
Event event = null;
String eventTopic = null;
String eventKey = null;
try {
long processedEvents = 0;
transaction = channel.getTransaction();
transaction.begin();
if (useKafkaTransactions) {
producer.beginTransaction();
}
kafkaFutures.clear();
long batchStartTime = System.nanoTime();
for (; processedEvents < batchSize; processedEvents += 1) {
event = channel.take();
if (event == null) {
// no events available in channel
if (processedEvents == 0) {
result = Status.BACKOFF;
counter.incrementBatchEmptyCount();
} else {
counter.incrementBatchUnderflowCount();
}
break;
}
counter.incrementEventDrainAttemptCount();
byte[] eventBody = event.getBody();
Map<String, String> headers = event.getHeaders();
if (allowTopicOverride) {
eventTopic = headers.get(topicHeader);
if (eventTopic == null) {
eventTopic = BucketPath.escapeString(topic, event.getHeaders());
logger.debug("{} was set to true but header {} was null. Producing to {}" +
" topic instead.",
new Object[]{KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER,
topicHeader, eventTopic});
}
} else {
eventTopic = topic;
}
eventKey = headers.get(KafkaSinkConstants.KEY_HEADER);
if (logger.isTraceEnabled()) {
if (LogPrivacyUtil.allowLogRawData()) {
logger.trace("{Event} " + eventTopic + " : " + eventKey + " : "
+ new String(eventBody, StandardCharsets.UTF_8));
} else {
logger.trace("{Event} " + eventTopic + " : " + eventKey);
}
}
logger.debug("event #{}", processedEvents);
// create a message and add to buffer
long startTime = System.currentTimeMillis();
Integer partitionId = null;
try {
ProducerRecord<String, byte[]> record;
if (staticPartitionId != null) {
partitionId = staticPartitionId;
}
//Allow a specified header to override a static ID
if (partitionHeader != null) {
String headerVal = event.getHeaders().get(partitionHeader);
if (headerVal != null) {
partitionId = Integer.parseInt(headerVal);
}
}
Long timestamp = null;
if (timestampHeader != null) {
String value = headers.get(timestampHeader);
if (value != null) {
try {
timestamp = Long.parseLong(value);
} catch (Exception ex) {
logger.warn("Invalid timestamp in header {} - {}", timestampHeader, value);
}
}
}
List<Header> kafkaHeaders = null;
if (!headerMap.isEmpty()) {
List<Header> tempHeaders = new ArrayList<>();
for (Map.Entry<String, String> entry : headerMap.entrySet()) {
String value = headers.get(entry.getKey());
if (value != null) {
tempHeaders.add(new RecordHeader(entry.getValue(),
value.getBytes(StandardCharsets.UTF_8)));
}
}
if (!tempHeaders.isEmpty()) {
kafkaHeaders = tempHeaders;
}
}
if (partitionId != null) {
record = new ProducerRecord<>(eventTopic, partitionId, timestamp, eventKey,
serializeEvent(event, useAvroEventFormat), kafkaHeaders);
} else {
record = new ProducerRecord<>(eventTopic, null, timestamp, eventKey,
serializeEvent(event, useAvroEventFormat), kafkaHeaders);
}
kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
} catch (NumberFormatException ex) {
throw new EventDeliveryException("Non integer partition id specified", ex);
} catch (Exception ex) {
// N.B. The producer.send() method throws all sorts of RuntimeExceptions
// Catching Exception here to wrap them neatly in an EventDeliveryException
// which is what our consumers will expect
throw new EventDeliveryException("Could not send event", ex);
}
}
if (useKafkaTransactions) {
producer.commitTransaction();
} else {
//Prevent linger.ms from holding the batch
producer.flush();
for (Future<RecordMetadata> future : kafkaFutures) {
future.get();
}
}
// publish batch and commit.
if (processedEvents > 0) {
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime - batchStartTime) / (1000 * 1000));
counter.addToEventDrainSuccessCount(processedEvents);
}
transaction.commit();
} catch (Exception ex) {
String errorMsg = "Failed to publish events";
logger.error("Failed to publish events", ex);
counter.incrementEventWriteOrChannelFail(ex);
if (transaction != null) {
try {
kafkaFutures.clear();
try {
if (useKafkaTransactions) {
producer.abortTransaction();
}
} catch (ProducerFencedException e) {
logger.error("Could not rollback transaction as producer fenced", e);
} finally {
transaction.rollback();
counter.incrementRollbackCount();
}
} catch (Exception e) {
logger.error("Transaction rollback failed", e);
throw Throwables.propagate(e);
}
}
throw new EventDeliveryException(errorMsg, ex);
} finally {
if (transaction != null) {
transaction.close();
}
}
return result;
}