public static Worker createDynamoDbStreamsWorker()

in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/StreamsWorkerFactory.java [44:68]


    public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, ExecutorService execService) {
        AmazonDynamoDBStreamsAdapterClient streamsClient = new AmazonDynamoDBStreamsAdapterClient(
            config.getKinesisCredentialsProvider(),
            config.getKinesisClientConfiguration());
        AmazonDynamoDB dynamoDBClient = createClient(AmazonDynamoDBClientBuilder.standard(),
            config.getDynamoDBCredentialsProvider(),
            config.getDynamoDBClientConfiguration(),
            config.getDynamoDBEndpoint(),
            config.getRegionName());
        KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode());
        return new Worker
            .Builder()
            .recordProcessorFactory(recordProcessorFactory)
            .config(config)
            .kinesisClient(streamsClient)
            .execService(execService)
            .kinesisProxy(getDynamoDBStreamsProxy(config, streamsClient))
            .shardSyncer(new DynamoDBStreamsShardSyncer(new StreamsLeaseCleanupValidator()))
            .shardPrioritization(config.getShardPrioritizationStrategy())
            .leaseManager(kinesisClientLeaseManager)
            .leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis())
                    .maxLeasesForWorker(config.getMaxLeasesForWorker()))
            .leaderDecider(new StreamsDeterministicShuffleShardSyncLeaderDecider(config, kinesisClientLeaseManager))
            .build();
    }