in amazon-kinesis-connector-flink/src/main/java/software/amazon/kinesis/connectors/flink/proxy/KinesisProxy.java [279:305]
public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
.withStreamName(shard.getStreamName())
.withShardId(shard.getShard().getShardId())
.withShardIteratorType(shardIteratorType);
switch (ShardIteratorType.fromValue(shardIteratorType)) {
case TRIM_HORIZON:
case LATEST:
break;
case AT_TIMESTAMP:
if (startingMarker instanceof Date) {
getShardIteratorRequest.setTimestamp((Date) startingMarker);
} else {
throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object.");
}
break;
case AT_SEQUENCE_NUMBER:
case AFTER_SEQUENCE_NUMBER:
if (startingMarker instanceof String) {
getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker);
} else {
throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String.");
}
}
return getShardIterator(getShardIteratorRequest);
}