public static void writeMessagesToKinesis()

in src/main/java/com/amazonaws/kda/flink/benchmarking/util/KDSProducerUtil.java [129:174]


	public static void writeMessagesToKinesis(AmazonKinesis kinesis, String streamName, List<String> recordList, Iterator<String> hashKeyIterator) {
		PutRecordsRequest putRecsReq = new PutRecordsRequest();
		List<PutRecordsRequestEntry> putRecReqEntryList = new ArrayList<PutRecordsRequestEntry>();
		PutRecordsResult putRecsRes = new PutRecordsResult();
		// Make sure you write messages in a batch of 500 messages
		List<List<String>> listofSmallerLists = Lists.partition(recordList, 500);
		for (List<String> smallerList : listofSmallerLists) {
			putRecReqEntryList.clear();
			for (String message : smallerList) {
				PutRecordsRequestEntry putRecsReqEntry = new PutRecordsRequestEntry();
				putRecsReqEntry.setData(ByteBuffer.wrap(message.getBytes()));
				putRecsReqEntry.setPartitionKey("reqiredButHasNoEffect-when-setExplicitHashKey-isUsed");
				putRecsReqEntry.setExplicitHashKey(hashKeyIterator.next());
				putRecReqEntryList.add(putRecsReqEntry);
			}
			try {
				putRecsReq.setStreamName(streamName);
				putRecsReq.setRecords(putRecReqEntryList);
				putRecsRes = kinesis.putRecords(putRecsReq);
				while (putRecsRes.getFailedRecordCount() > 0) {
					System.out.println("Processing rejected records");
					// TODO: For simplicity, the backoff implemented as a constant 100ms sleep
					// For production-grade, consider using CoralRetry's Exponential Jittered
					// Backoff retry strategy
					// Ref:
					// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
					Thread.sleep(100);
					final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<PutRecordsRequestEntry>();
					final List<PutRecordsResultEntry> putRecsResEntryList = putRecsRes.getRecords();
					for (int i = 0; i < putRecsResEntryList.size(); i++) {
						final PutRecordsRequestEntry putRecordReqEntry = putRecReqEntryList.get(i);
						final PutRecordsResultEntry putRecordsResEntry = putRecsResEntryList.get(i);
						if (putRecordsResEntry.getErrorCode() != null) {
							failedRecordsList.add(putRecordReqEntry);
						}
					}
					putRecReqEntryList = failedRecordsList;
					putRecsReq.setRecords(putRecReqEntryList);
					putRecsRes = kinesis.putRecords(putRecsReq);
				} // end of while loop
				System.out.println("Number of messages written: " + smallerList.size());
			} catch (Exception e) {
				System.out.println("Exception in Kinesis Batch Insert: " + e.getMessage());
			}
		}
	}