public void open()

in pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java [71:120]


    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.dynamodbSourceConfig = DynamoDBSourceConfig.load(config);
        
        checkArgument(isNotBlank(dynamodbSourceConfig.getAwsDynamodbStreamArn()), "empty dynamo-stream arn");
//       Even if the endpoint is set, it seems to require a region to go with it
        checkArgument(isNotBlank(dynamodbSourceConfig.getAwsRegion()),
                     "The aws-region must be set");
        checkArgument(isNotBlank(dynamodbSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param");
        
        if (dynamodbSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
            checkArgument((dynamodbSourceConfig.getStartAtTime() != null),"Timestamp must be specified");
        }
        
        queue = new LinkedBlockingQueue<> (dynamodbSourceConfig.getReceiveQueueSize());
        workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        
        AwsCredentialProviderPlugin credentialsProvider = createCredentialProvider(
                dynamodbSourceConfig.getAwsCredentialPluginName(),
                dynamodbSourceConfig.getAwsCredentialPluginParam());

        AmazonDynamoDBStreams dynamoDBStreamsClient = dynamodbSourceConfig.buildDynamoDBStreamsClient(credentialsProvider);
        AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient);
        recordProcessorFactory = new StreamsRecordProcessorFactory(queue, dynamodbSourceConfig);

        kinesisClientLibConfig = new KinesisClientLibConfiguration(dynamodbSourceConfig.getApplicationName(),
                dynamodbSourceConfig.getAwsDynamodbStreamArn(),
                credentialsProvider.getCredentialProvider(),
                workerId)
                .withRegionName(dynamodbSourceConfig.getAwsRegion())
                .withInitialPositionInStream(dynamodbSourceConfig.getInitialPositionInStream());

        if(kinesisClientLibConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
            kinesisClientLibConfig.withTimestampAtInitialPositionInStream(dynamodbSourceConfig.getStartAtTime());
        }

        worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(recordProcessorFactory,
                kinesisClientLibConfig,
                adapterClient,
                dynamodbSourceConfig.buildDynamoDBClient(credentialsProvider),
                dynamodbSourceConfig.buildCloudwatchClient(credentialsProvider));

        workerThread = new Thread(worker);
        workerThread.setDaemon(true);
        threadEx = null;
        workerThread.setUncaughtExceptionHandler((t, ex) -> {
            threadEx = ex;
            log.error("Worker died with error", ex);
        });
        workerThread.start();
    }