in src/main/java/com/amazonaws/kda/flink/benchmarking/util/KDSProducerUtil.java [129:174]
public static void writeMessagesToKinesis(AmazonKinesis kinesis, String streamName, List<String> recordList, Iterator<String> hashKeyIterator) {
PutRecordsRequest putRecsReq = new PutRecordsRequest();
List<PutRecordsRequestEntry> putRecReqEntryList = new ArrayList<PutRecordsRequestEntry>();
PutRecordsResult putRecsRes = new PutRecordsResult();
// Make sure you write messages in a batch of 500 messages
List<List<String>> listofSmallerLists = Lists.partition(recordList, 500);
for (List<String> smallerList : listofSmallerLists) {
putRecReqEntryList.clear();
for (String message : smallerList) {
PutRecordsRequestEntry putRecsReqEntry = new PutRecordsRequestEntry();
putRecsReqEntry.setData(ByteBuffer.wrap(message.getBytes()));
putRecsReqEntry.setPartitionKey("reqiredButHasNoEffect-when-setExplicitHashKey-isUsed");
putRecsReqEntry.setExplicitHashKey(hashKeyIterator.next());
putRecReqEntryList.add(putRecsReqEntry);
}
try {
putRecsReq.setStreamName(streamName);
putRecsReq.setRecords(putRecReqEntryList);
putRecsRes = kinesis.putRecords(putRecsReq);
while (putRecsRes.getFailedRecordCount() > 0) {
System.out.println("Processing rejected records");
// TODO: For simplicity, the backoff implemented as a constant 100ms sleep
// For production-grade, consider using CoralRetry's Exponential Jittered
// Backoff retry strategy
// Ref:
// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
Thread.sleep(100);
final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<PutRecordsRequestEntry>();
final List<PutRecordsResultEntry> putRecsResEntryList = putRecsRes.getRecords();
for (int i = 0; i < putRecsResEntryList.size(); i++) {
final PutRecordsRequestEntry putRecordReqEntry = putRecReqEntryList.get(i);
final PutRecordsResultEntry putRecordsResEntry = putRecsResEntryList.get(i);
if (putRecordsResEntry.getErrorCode() != null) {
failedRecordsList.add(putRecordReqEntry);
}
}
putRecReqEntryList = failedRecordsList;
putRecsReq.setRecords(putRecReqEntryList);
putRecsRes = kinesis.putRecords(putRecsReq);
} // end of while loop
System.out.println("Number of messages written: " + smallerList.size());
} catch (Exception e) {
System.out.println("Exception in Kinesis Batch Insert: " + e.getMessage());
}
}
}