private DynamoDbStreamsProxy createDynamoDbStreamsProxy()

in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java [202:242]


    private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerConfig) {
        SdkHttpClient httpClient =
                AWSGeneralUtil.createSyncHttpClient(
                        AttributeMap.builder().build(), ApacheHttpClient.builder());

        Properties dynamoDbStreamsClientProperties = new Properties();
        String region =
                AWSGeneralUtil.getRegionFromArn(streamArn)
                        .orElseThrow(
                                () ->
                                        new IllegalStateException(
                                                "Unable to determine region from stream arn"));
        dynamoDbStreamsClientProperties.put(AWSConfigConstants.AWS_REGION, region);
        consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties);

        AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties);
        int maxApiCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT);
        Duration minDescribeStreamDelay =
                sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY);
        Duration maxDescribeStreamDelay =
                sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY);
        BackoffStrategy backoffStrategy =
                BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay);
        AdaptiveRetryStrategy adaptiveRetryStrategy =
                SdkDefaultRetryStrategy.adaptiveRetryStrategy()
                        .toBuilder()
                        .maxAttempts(maxApiCallAttempts)
                        .backoffStrategy(backoffStrategy)
                        .throttlingBackoffStrategy(backoffStrategy)
                        .build();
        DynamoDbStreamsClient dynamoDbStreamsClient =
                AWSClientUtil.createAwsSyncClient(
                        dynamoDbStreamsClientProperties,
                        httpClient,
                        DynamoDbStreamsClient.builder(),
                        ClientOverrideConfiguration.builder().retryStrategy(adaptiveRetryStrategy),
                        DynamodbStreamsSourceConfigConstants
                                .BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT,
                        DynamodbStreamsSourceConfigConstants.DDB_STREAMS_CLIENT_USER_AGENT_PREFIX);
        return new DynamoDbStreamsProxy(dynamoDbStreamsClient, httpClient);
    }