in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java [92:141]
public GetRecordsResponse getRecords(
String streamArn, String shardId, StartingPosition startingPosition) {
String shardIterator =
shardIdToIteratorStore.computeIfAbsent(
shardId, (s) -> getShardIterator(streamArn, s, startingPosition));
if (shardIterator == null) {
return EMPTY_GET_RECORDS_RESPONSE;
}
try {
GetRecordsResponse getRecordsResponse = getRecords(shardIterator);
if (getRecordsResponse.nextShardIterator() != null) {
shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator());
}
return getRecordsResponse;
} catch (ExpiredIteratorException e) {
LOG.info(
"Received ExpiredIteratorException from GetRecords. "
+ "Calling GetShardIterator for shard: {} with position: {}",
startingPosition,
shardId);
// Eagerly retry getRecords() if the iterator is expired
shardIterator = getShardIterator(streamArn, shardId, startingPosition);
GetRecordsResponse getRecordsResponse = getRecords(shardIterator);
if (getRecordsResponse.nextShardIterator() != null) {
shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator());
}
return getRecordsResponse;
} catch (TrimmedDataAccessException e) {
// TrimmedDataAccessException means that the record pointed by shard iterator has
// expired.
// We should read the shard back from TRIM_HORIZON
LOG.warn(
"Received TrimmedDataAccessException from GetRecords. "
+ "Calling GetShardIterator for shard: {} with TRIM_HORIZON",
shardId);
shardIterator = getShardIterator(streamArn, shardId, StartingPosition.fromStart());
GetRecordsResponse getRecordsResponse = getRecords(shardIterator);
if (getRecordsResponse.nextShardIterator() != null) {
shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator());
}
return getRecordsResponse;
} catch (ResourceNotFoundException e) {
LOG.warn(
"Received ResourceNotFoundException from GetRecords for shard: {}. "
+ "This might indicate that there is restore happening from stale snapshot or data loss from backpressure",
shardId);
return EMPTY_GET_RECORDS_RESPONSE;
}
}