in pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java [74:129]
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.kinesisSourceConfig = KinesisSourceConfig.load(config);
checkArgument(isNotBlank(kinesisSourceConfig.getAwsKinesisStreamName()), "empty kinesis-stream name");
checkArgument(isNotBlank(kinesisSourceConfig.getAwsEndpoint()) ||
isNotBlank(kinesisSourceConfig.getAwsRegion()),
"Either the aws-end-point or aws-region must be set");
checkArgument(isNotBlank(kinesisSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param");
if (kinesisSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
checkArgument((kinesisSourceConfig.getStartAtTime() != null),"Timestamp must be specified");
}
queue = new LinkedBlockingQueue<KinesisRecord> (kinesisSourceConfig.getReceiveQueueSize());
workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
AwsCredentialProviderPlugin credentialsProvider = createCredentialProvider(
kinesisSourceConfig.getAwsCredentialPluginName(),
kinesisSourceConfig.getAwsCredentialPluginParam());
KinesisAsyncClient kClient = kinesisSourceConfig.buildKinesisAsyncClient(credentialsProvider);
recordProcessorFactory = new KinesisRecordProcessorFactory(queue, kinesisSourceConfig);
configsBuilder = new ConfigsBuilder(kinesisSourceConfig.getAwsKinesisStreamName(),
kinesisSourceConfig.getApplicationName(),
kClient,
kinesisSourceConfig.buildDynamoAsyncClient(credentialsProvider),
kinesisSourceConfig.buildCloudwatchAsyncClient(credentialsProvider),
workerId,
recordProcessorFactory);
RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
if (!kinesisSourceConfig.isUseEnhancedFanOut()) {
retrievalConfig.retrievalSpecificConfig(
new PollingConfig(kinesisSourceConfig.getAwsKinesisStreamName(),
kClient));
}
retrievalConfig.initialPositionInStreamExtended(kinesisSourceConfig.getStreamStartPosition());
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
retrievalConfig
);
schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
threadEx = null;
schedulerThread.setUncaughtExceptionHandler((t, ex) -> {
threadEx = ex;
});
schedulerThread.start();
}