in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [331:365]
protected void doPut(Event event) throws InterruptedException {
type = TransactionType.PUT;
if (!producerRecords.isPresent()) {
producerRecords = Optional.of(new LinkedList<ProducerRecord<String, byte[]>>());
}
String key = event.getHeaders().get(KEY_HEADER);
Integer partitionId = null;
try {
if (staticPartitionId != null) {
partitionId = staticPartitionId;
}
// Allow a specified header to override a static ID
if (partitionHeader != null) {
String headerVal = event.getHeaders().get(partitionHeader);
if (headerVal != null) {
partitionId = Integer.parseInt(headerVal);
}
}
if (partitionId != null) {
producerRecords.get().add(
new ProducerRecord<String, byte[]>(topic.get(), partitionId, key,
serializeValue(event, parseAsFlumeEvent)));
} else {
producerRecords.get().add(
new ProducerRecord<String, byte[]>(topic.get(), key,
serializeValue(event, parseAsFlumeEvent)));
}
counter.incrementEventPutAttemptCount();
} catch (NumberFormatException e) {
throw new ChannelException("Non integer partition id specified", e);
} catch (Exception e) {
throw new ChannelException("Error while serializing event", e);
}
}