in flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java [104:139]
private String getShardIterator(
String streamArn, String shardId, StartingPosition startingPosition) {
GetShardIteratorRequest.Builder requestBuilder =
GetShardIteratorRequest.builder()
.streamARN(streamArn)
.shardId(shardId)
.shardIteratorType(startingPosition.getShardIteratorType());
switch (startingPosition.getShardIteratorType()) {
case TRIM_HORIZON:
case LATEST:
break;
case AT_TIMESTAMP:
if (startingPosition.getStartingMarker() instanceof Instant) {
requestBuilder =
requestBuilder.timestamp(
(Instant) startingPosition.getStartingMarker());
} else {
throw new IllegalArgumentException(
"Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object.");
}
break;
case AT_SEQUENCE_NUMBER:
case AFTER_SEQUENCE_NUMBER:
if (startingPosition.getStartingMarker() instanceof String) {
requestBuilder =
requestBuilder.startingSequenceNumber(
(String) startingPosition.getStartingMarker());
} else {
throw new IllegalArgumentException(
"Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String.");
}
}
return kinesisClient.getShardIterator(requestBuilder.build()).shardIterator();
}