in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java [202:242]
private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerConfig) {
SdkHttpClient httpClient =
AWSGeneralUtil.createSyncHttpClient(
AttributeMap.builder().build(), ApacheHttpClient.builder());
Properties dynamoDbStreamsClientProperties = new Properties();
String region =
AWSGeneralUtil.getRegionFromArn(streamArn)
.orElseThrow(
() ->
new IllegalStateException(
"Unable to determine region from stream arn"));
dynamoDbStreamsClientProperties.put(AWSConfigConstants.AWS_REGION, region);
consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties);
AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties);
int maxApiCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT);
Duration minDescribeStreamDelay =
sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY);
Duration maxDescribeStreamDelay =
sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY);
BackoffStrategy backoffStrategy =
BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay);
AdaptiveRetryStrategy adaptiveRetryStrategy =
SdkDefaultRetryStrategy.adaptiveRetryStrategy()
.toBuilder()
.maxAttempts(maxApiCallAttempts)
.backoffStrategy(backoffStrategy)
.throttlingBackoffStrategy(backoffStrategy)
.build();
DynamoDbStreamsClient dynamoDbStreamsClient =
AWSClientUtil.createAwsSyncClient(
dynamoDbStreamsClientProperties,
httpClient,
DynamoDbStreamsClient.builder(),
ClientOverrideConfiguration.builder().retryStrategy(adaptiveRetryStrategy),
DynamodbStreamsSourceConfigConstants
.BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT,
DynamodbStreamsSourceConfigConstants.DDB_STREAMS_CLIENT_USER_AGENT_PREFIX);
return new DynamoDbStreamsProxy(dynamoDbStreamsClient, httpClient);
}