in kinesis-taxi-stream-producer/src/main/java/com/amazonaws/flink/refarch/utils/WatermarkTracker.java [67:91]
private void sentWatermarkToShards() {
try {
//refresh the list of available shards, if current state is too old
if (System.currentTimeMillis() - lastShardRefreshTime >= SHARD_REFRESH_MILLIES) {
refreshShards();
lastShardRefreshTime = System.currentTimeMillis();
}
//send a watermark to every shard of the Kinesis stream
shards.parallelStream()
.map(shard -> new PutRecordRequest()
.withStreamName(streamName)
.withData(new WatermarkEvent(currentWatermark).toByteBuffer())
.withPartitionKey("23")
.withExplicitHashKey(shard.getHashKeyRange().getStartingHashKey()))
.map(kinesisClient::putRecord)
.forEach(putRecordResult -> LOG.trace("send watermark {} to shard {}", new DateTime(currentWatermark), putRecordResult.getShardId()));
LOG.debug("send watermark {}", new DateTime(currentWatermark));
} catch (LimitExceededException | ProvisionedThroughputExceededException e) {
//if any request is throttled, just wait for the next iteration to submit another watermark
LOG.warn("skipping watermark due to limit/throughput exceeded exception");
}
}