in src/main/java/com/amazonaws/kda/flink/benchmarking/util/KDSProducerUtil.java [100:127]
public static void writeSingleMessageToKinesis(String record, String streamName, AmazonKinesis kinesis, String startingHashKey) {
PutRecordRequest putRecReq = new PutRecordRequest();
try {
putRecReq.setStreamName(streamName);
putRecReq.setData(ByteBuffer.wrap(record.getBytes()));
putRecReq.setExplicitHashKey(startingHashKey);
putRecReq.setPartitionKey("reqiredButHasNoEffect-when-setExplicitHashKey-isUsed");
kinesis.putRecord(putRecReq);
} catch (ProvisionedThroughputExceededException exception) {
try {
System.out.println("ERROR: Throughput Exception Thrown.");
exception.printStackTrace();
System.out.println("Retrying after a short delay.");
Thread.sleep(100);
kinesis.putRecord(putRecReq);
} catch (ProvisionedThroughputExceededException e) {
e.printStackTrace();
System.out.println("Kinesis Put operation failed after re-try due to second consecutive "
+ "ProvisionedThroughputExceededException");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception thrown while writing a record to Kinesis.");
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception thrown while writing a record to Kinesis.");
}
}