in pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java [71:120]
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.dynamodbSourceConfig = DynamoDBSourceConfig.load(config);
checkArgument(isNotBlank(dynamodbSourceConfig.getAwsDynamodbStreamArn()), "empty dynamo-stream arn");
// Even if the endpoint is set, it seems to require a region to go with it
checkArgument(isNotBlank(dynamodbSourceConfig.getAwsRegion()),
"The aws-region must be set");
checkArgument(isNotBlank(dynamodbSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param");
if (dynamodbSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
checkArgument((dynamodbSourceConfig.getStartAtTime() != null),"Timestamp must be specified");
}
queue = new LinkedBlockingQueue<> (dynamodbSourceConfig.getReceiveQueueSize());
workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
AwsCredentialProviderPlugin credentialsProvider = createCredentialProvider(
dynamodbSourceConfig.getAwsCredentialPluginName(),
dynamodbSourceConfig.getAwsCredentialPluginParam());
AmazonDynamoDBStreams dynamoDBStreamsClient = dynamodbSourceConfig.buildDynamoDBStreamsClient(credentialsProvider);
AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient);
recordProcessorFactory = new StreamsRecordProcessorFactory(queue, dynamodbSourceConfig);
kinesisClientLibConfig = new KinesisClientLibConfiguration(dynamodbSourceConfig.getApplicationName(),
dynamodbSourceConfig.getAwsDynamodbStreamArn(),
credentialsProvider.getCredentialProvider(),
workerId)
.withRegionName(dynamodbSourceConfig.getAwsRegion())
.withInitialPositionInStream(dynamodbSourceConfig.getInitialPositionInStream());
if(kinesisClientLibConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
kinesisClientLibConfig.withTimestampAtInitialPositionInStream(dynamodbSourceConfig.getStartAtTime());
}
worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(recordProcessorFactory,
kinesisClientLibConfig,
adapterClient,
dynamodbSourceConfig.buildDynamoDBClient(credentialsProvider),
dynamodbSourceConfig.buildCloudwatchClient(credentialsProvider));
workerThread = new Thread(worker);
workerThread.setDaemon(true);
threadEx = null;
workerThread.setUncaughtExceptionHandler((t, ex) -> {
threadEx = ex;
log.error("Worker died with error", ex);
});
workerThread.start();
}