in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java [351:383]
public String getShardIterator(
StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker)
throws InterruptedException {
GetShardIteratorRequest getShardIteratorRequest =
new GetShardIteratorRequest()
.withStreamName(shard.getStreamName())
.withStreamARN(lookupStreamArn(shard.getStreamName()))
.withShardId(shard.getShard().getShardId())
.withShardIteratorType(shardIteratorType);
switch (ShardIteratorType.fromValue(shardIteratorType)) {
case TRIM_HORIZON:
case LATEST:
break;
case AT_TIMESTAMP:
if (startingMarker instanceof Date) {
getShardIteratorRequest.setTimestamp((Date) startingMarker);
} else {
throw new IllegalArgumentException(
"Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object.");
}
break;
case AT_SEQUENCE_NUMBER:
case AFTER_SEQUENCE_NUMBER:
if (startingMarker instanceof String) {
getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker);
} else {
throw new IllegalArgumentException(
"Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String.");
}
}
return getShardIterator(getShardIteratorRequest);
}