private void sentWatermarkToShards()

in kinesis-taxi-stream-producer/src/main/java/com/amazonaws/flink/refarch/utils/WatermarkTracker.java [67:91]


  private void sentWatermarkToShards() {
    try {
      //refresh the list of available shards, if current state is too old
      if (System.currentTimeMillis() - lastShardRefreshTime >= SHARD_REFRESH_MILLIES) {
        refreshShards();

        lastShardRefreshTime = System.currentTimeMillis();
      }

      //send a watermark to every shard of the Kinesis stream
      shards.parallelStream()
          .map(shard -> new PutRecordRequest()
              .withStreamName(streamName)
              .withData(new WatermarkEvent(currentWatermark).toByteBuffer())
              .withPartitionKey("23")
              .withExplicitHashKey(shard.getHashKeyRange().getStartingHashKey()))
          .map(kinesisClient::putRecord)
          .forEach(putRecordResult -> LOG.trace("send watermark {} to shard {}", new DateTime(currentWatermark), putRecordResult.getShardId()));

      LOG.debug("send watermark {}", new DateTime(currentWatermark));
    } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
      //if any request is throttled, just wait for the next iteration to submit another watermark
      LOG.warn("skipping watermark due to limit/throughput exceeded exception");
    }
  }