in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java [173:216]
private String getShardIterator(
String streamArn, String shardId, StartingPosition startingPosition) {
GetShardIteratorRequest.Builder requestBuilder =
GetShardIteratorRequest.builder()
.streamArn(streamArn)
.shardId(shardId)
.shardIteratorType(startingPosition.getShardIteratorType());
switch (startingPosition.getShardIteratorType()) {
case TRIM_HORIZON:
case LATEST:
break;
case AT_SEQUENCE_NUMBER:
case AFTER_SEQUENCE_NUMBER:
if (startingPosition.getStartingMarker() instanceof String) {
requestBuilder =
requestBuilder.sequenceNumber(
(String) startingPosition.getStartingMarker());
} else {
throw new IllegalArgumentException(
"Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String.");
}
}
try {
return dynamoDbStreamsClient.getShardIterator(requestBuilder.build()).shardIterator();
} catch (ResourceNotFoundException e) {
LOG.warn(
"Received ResourceNotFoundException from GetShardIterator. "
+ "Shard {} of stream {} is no longer valid, marking it as complete."
+ "This might indicate that there is restore happening from stale snapshot or data loss from backpressure",
shardId,
streamArn);
return null;
} catch (TrimmedDataAccessException e) {
LOG.warn(
"Received TrimmedDataAccessException from GetShardIterator. "
+ "Shard {} of stream {} is no longer valid, marking it as complete."
+ "This might indicate that there is restore happening from stale snapshot or data loss from backpressure",
shardId,
streamArn);
return null;
}
}