in src/main/java/com/amazon/kinesis/kafka/FirehoseSinkTask.java [198:213]
private void putRecords(Collection<SinkRecord> sinkRecords) {
for (SinkRecord sinkRecord : sinkRecords) {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setDeliveryStreamName(deliveryStreamName);
putRecordRequest.setRecord(DataUtility.createRecord(sinkRecord));
try {
firehoseClient.putRecord(putRecordRequest);
}catch(AmazonKinesisFirehoseException akfe){
System.out.println("Amazon Kinesis Firehose Exception:" + akfe.getLocalizedMessage());
}catch(Exception e){
System.out.println("Connector Exception" + e.getLocalizedMessage());
}
}
}