in amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java [212:310]
protected Scheduler(@NonNull final CheckpointConfig checkpointConfig,
@NonNull final CoordinatorConfig coordinatorConfig,
@NonNull final LeaseManagementConfig leaseManagementConfig,
@NonNull final LifecycleConfig lifecycleConfig,
@NonNull final MetricsConfig metricsConfig,
@NonNull final ProcessorConfig processorConfig,
@NonNull final RetrievalConfig retrievalConfig,
@NonNull final DiagnosticEventFactory diagnosticEventFactory) {
this.checkpointConfig = checkpointConfig;
this.coordinatorConfig = coordinatorConfig;
this.leaseManagementConfig = leaseManagementConfig;
this.lifecycleConfig = lifecycleConfig;
this.metricsConfig = metricsConfig;
this.processorConfig = processorConfig;
this.retrievalConfig = retrievalConfig;
this.applicationName = this.coordinatorConfig.applicationName();
this.isMultiStreamMode = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker -> true, streamConfig -> false);
this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map(
multiStreamTracker -> {
this.multiStreamTracker = multiStreamTracker;
this.formerStreamsLeasesDeletionStrategy = multiStreamTracker.formerStreamsLeasesDeletionStrategy();
this.orphanedStreamInitialPositionInStream = multiStreamTracker.orphanedStreamInitialPositionInStream();
return multiStreamTracker.streamConfigList().stream()
.collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
},
streamConfig ->
Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig));
this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
this.metricsFactory = this.metricsConfig.metricsFactory();
// Determine leaseSerializer based on availability of MultiStreamTracker.
final LeaseSerializer leaseSerializer = isMultiStreamMode ?
new DynamoDBMultiStreamLeaseSerializer() :
new DynamoDBLeaseSerializer();
this.leaseCoordinator = this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createLeaseCoordinator(this.metricsFactory);
this.leaseRefresher = this.leaseCoordinator.leaseRefresher();
//
// TODO: Figure out what to do with lease manage <=> checkpoint relationship
//
this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpointer(this.leaseCoordinator,
this.leaseRefresher);
//
// TODO: Move this configuration to lifecycle
//
this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis();
this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis();
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
this.diagnosticEventFactory = diagnosticEventFactory;
this.diagnosticEventHandler = new DiagnosticEventLogger();
this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createShardSyncTaskManager(this.metricsFactory, streamConfig);
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist =
this.coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist();
if (coordinatorConfig.gracefulShutdownCoordinator() != null) {
this.gracefulShutdownCoordinator = coordinatorConfig.gracefulShutdownCoordinator();
} else {
this.gracefulShutdownCoordinator = this.coordinatorConfig.coordinatorFactory()
.createGracefulShutdownCoordinator();
}
if (coordinatorConfig.workerStateChangeListener() != null) {
this.workerStateChangeListener = coordinatorConfig.workerStateChangeListener();
} else {
this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory()
.createWorkerStateChangeListener();
}
this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher,
Executors.newSingleThreadScheduledExecutor(),
PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
// this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds();
// this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool();
this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
this.hierarchicalShardSyncerProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).hierarchicalShardSyncer();
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, isMultiStreamMode, metricsFactory,
leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(),
leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold());
this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createLeaseCleanupManager(metricsFactory);
this.schemaRegistryDecoder =
this.retrievalConfig.glueSchemaRegistryDeserializer() == null ?
null
: new SchemaRegistryDecoder(this.retrievalConfig.glueSchemaRegistryDeserializer());
}