in src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java [129:159]
public void put(Collection<SinkRecord> sinkRecords) {
checkForEarlierPutException();
// If KinesisProducers cannot write to Kinesis Streams (because of
// connectivity issues, access issues
// or misconfigured shards we will pause consumption of messages till
// backlog is cleared
validateOutStandingRecords();
String partitionKey;
for (SinkRecord sinkRecord : sinkRecords) {
ListenableFuture<UserRecordResult> f;
// Kinesis does not allow empty partition key
if (sinkRecord.key() != null && !sinkRecord.key().toString().trim().equals("")) {
partitionKey = sinkRecord.key().toString().trim();
} else {
partitionKey = Integer.toString(sinkRecord.kafkaPartition());
}
if (singleKinesisProducerPerPartition)
f = addUserRecord(producerMap.get(sinkRecord.kafkaPartition() + "@" + sinkRecord.topic()), streamName,
partitionKey, usePartitionAsHashKey, sinkRecord);
else
f = addUserRecord(kinesisProducer, streamName, partitionKey, usePartitionAsHashKey, sinkRecord);
Futures.addCallback(f, callback, MoreExecutors.directExecutor());
}
}