public void open()

in pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java [74:129]


    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.kinesisSourceConfig = KinesisSourceConfig.load(config);
        
        checkArgument(isNotBlank(kinesisSourceConfig.getAwsKinesisStreamName()), "empty kinesis-stream name");
        checkArgument(isNotBlank(kinesisSourceConfig.getAwsEndpoint()) || 
                      isNotBlank(kinesisSourceConfig.getAwsRegion()), 
                     "Either the aws-end-point or aws-region must be set");
        checkArgument(isNotBlank(kinesisSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param");
        
        if (kinesisSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
            checkArgument((kinesisSourceConfig.getStartAtTime() != null),"Timestamp must be specified");
        }
        
        queue = new LinkedBlockingQueue<KinesisRecord> (kinesisSourceConfig.getReceiveQueueSize());
        workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        
        AwsCredentialProviderPlugin credentialsProvider = createCredentialProvider(
                kinesisSourceConfig.getAwsCredentialPluginName(), 
                kinesisSourceConfig.getAwsCredentialPluginParam());

        KinesisAsyncClient kClient = kinesisSourceConfig.buildKinesisAsyncClient(credentialsProvider);
        recordProcessorFactory = new KinesisRecordProcessorFactory(queue, kinesisSourceConfig);
        configsBuilder = new ConfigsBuilder(kinesisSourceConfig.getAwsKinesisStreamName(),
                                            kinesisSourceConfig.getApplicationName(),
                                            kClient,
                                            kinesisSourceConfig.buildDynamoAsyncClient(credentialsProvider),
                                            kinesisSourceConfig.buildCloudwatchAsyncClient(credentialsProvider),
                                            workerId,
                                            recordProcessorFactory);

        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        if (!kinesisSourceConfig.isUseEnhancedFanOut()) {
            retrievalConfig.retrievalSpecificConfig(
                    new PollingConfig(kinesisSourceConfig.getAwsKinesisStreamName(),
                                      kClient));
        }

        retrievalConfig.initialPositionInStreamExtended(kinesisSourceConfig.getStreamStartPosition());

        scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig
        );
        schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        threadEx = null;
        schedulerThread.setUncaughtExceptionHandler((t, ex) -> {
            threadEx = ex;
        });
        schedulerThread.start();
    }