in src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/StreamsWorkerFactory.java [44:68]
public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, ExecutorService execService) {
AmazonDynamoDBStreamsAdapterClient streamsClient = new AmazonDynamoDBStreamsAdapterClient(
config.getKinesisCredentialsProvider(),
config.getKinesisClientConfiguration());
AmazonDynamoDB dynamoDBClient = createClient(AmazonDynamoDBClientBuilder.standard(),
config.getDynamoDBCredentialsProvider(),
config.getDynamoDBClientConfiguration(),
config.getDynamoDBEndpoint(),
config.getRegionName());
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode());
return new Worker
.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.kinesisClient(streamsClient)
.execService(execService)
.kinesisProxy(getDynamoDBStreamsProxy(config, streamsClient))
.shardSyncer(new DynamoDBStreamsShardSyncer(new StreamsLeaseCleanupValidator()))
.shardPrioritization(config.getShardPrioritizationStrategy())
.leaseManager(kinesisClientLeaseManager)
.leaseTaker(new StreamsLeaseTaker<>(kinesisClientLeaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis())
.maxLeasesForWorker(config.getMaxLeasesForWorker()))
.leaderDecider(new StreamsDeterministicShuffleShardSyncLeaderDecider(config, kinesisClientLeaseManager))
.build();
}