in src/main/java/com/amazonaws/services/dynamodbv2/streams/connectors/CommandLineInterface.java [147:231]
public Worker createWorker() {
// use default credential provider chain to locate appropriate credentials
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
// initialize DynamoDB client and set the endpoint properly for source table / region
final AmazonDynamoDB dynamodbClient = AmazonDynamoDBClientBuilder.standard()
.withCredentials(credentialsProvider)
.withEndpointConfiguration(createEndpointConfiguration(sourceRegion, sourceDynamodbEndpoint, AmazonDynamoDB.ENDPOINT_PREFIX))
.build();
// initialize Streams client
final AwsClientBuilder.EndpointConfiguration streamsEndpointConfiguration = createEndpointConfiguration(sourceRegion,
sourceDynamodbStreamsEndpoint, AmazonDynamoDBStreams.ENDPOINT_PREFIX);
final ClientConfiguration streamsClientConfig = new ClientConfiguration().withGzip(false);
final AmazonDynamoDBStreams streamsClient = AmazonDynamoDBStreamsClientBuilder.standard()
.withCredentials(credentialsProvider)
.withEndpointConfiguration(streamsEndpointConfiguration)
.withClientConfiguration(streamsClientConfig)
.build();
// obtain the Stream ID associated with the source table
final String streamArn = dynamodbClient.describeTable(sourceTable).getTable().getLatestStreamArn();
final boolean streamEnabled = DynamoDBConnectorUtilities.isStreamsEnabled(streamsClient, streamArn, DynamoDBConnectorConstants.NEW_AND_OLD);
Preconditions.checkArgument(streamArn != null, DynamoDBConnectorConstants.MSG_NO_STREAMS_FOUND);
Preconditions.checkState(streamEnabled, DynamoDBConnectorConstants.STREAM_NOT_READY);
// initialize DynamoDB client for KCL
final AmazonDynamoDB kclDynamoDBClient = AmazonDynamoDBClientBuilder.standard()
.withCredentials(credentialsProvider)
.withEndpointConfiguration(createKclDynamoDbEndpointConfiguration())
.build();
// initialize DynamoDB Streams Adapter client and set the Streams endpoint properly
final AmazonDynamoDBStreamsAdapterClient streamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsClient);
// initialize CloudWatch client and set the region to emit metrics to
final AmazonCloudWatch kclCloudWatchClient;
if (isPublishCloudWatch) {
kclCloudWatchClient = AmazonCloudWatchClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(kclRegion.or(sourceRegion).getName()).build();
} else {
kclCloudWatchClient = new NoopCloudWatch();
}
// try to get taskname from command line arguments, auto generate one if needed
final AwsClientBuilder.EndpointConfiguration destinationEndpointConfiguration = createEndpointConfiguration(destinationRegion,
destinationDynamodbEndpoint, AmazonDynamoDB.ENDPOINT_PREFIX);
final String actualTaskName = DynamoDBConnectorUtilities.getTaskName(sourceRegion, destinationRegion, taskName, sourceTable, destinationTable);
// set the appropriate Connector properties for the destination KCL configuration
final Properties properties = new Properties();
properties.put(DynamoDBStreamsConnectorConfiguration.PROP_APP_NAME, actualTaskName);
properties.put(DynamoDBStreamsConnectorConfiguration.PROP_DYNAMODB_ENDPOINT, destinationEndpointConfiguration.getServiceEndpoint());
properties.put(DynamoDBStreamsConnectorConfiguration.PROP_DYNAMODB_DATA_TABLE_NAME, destinationTable);
properties.put(DynamoDBStreamsConnectorConfiguration.PROP_REGION_NAME, destinationRegion.getName());
// create the record processor factory based on given pipeline and connector configurations
// use the master to replicas pipeline
final KinesisConnectorRecordProcessorFactory<Record, Record> factory = new KinesisConnectorRecordProcessorFactory<>(
new DynamoDBMasterToReplicasPipeline(), new DynamoDBStreamsConnectorConfiguration(properties, credentialsProvider));
// create the KCL configuration with default values
final KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration(actualTaskName,
streamArn,
credentialsProvider,
DynamoDBConnectorConstants.WORKER_LABEL + actualTaskName + UUID.randomUUID().toString())
// worker will use checkpoint table if available, otherwise it is safer
// to start at beginning of the stream
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
// we want the maximum batch size to avoid network transfer latency overhead
.withMaxRecords(getRecordsLimit.or(DynamoDBConnectorConstants.STREAMS_RECORDS_LIMIT))
// wait a reasonable amount of time - default 0.5 seconds
.withIdleTimeBetweenReadsInMillis(DynamoDBConnectorConstants.IDLE_TIME_BETWEEN_READS)
// Remove calls to GetShardIterator
.withValidateSequenceNumberBeforeCheckpointing(false)
// make parent shard poll interval tunable to decrease time to run integration test
.withParentShardPollIntervalMillis(parentShardPollIntervalMillis.or(DynamoDBConnectorConstants.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS))
// avoid losing leases too often - default 60 seconds
.withFailoverTimeMillis(DynamoDBConnectorConstants.KCL_FAILOVER_TIME);
// create the KCL worker for this connector
return new Worker(factory, kclConfig, streamsAdapterClient, kclDynamoDBClient, kclCloudWatchClient);
}