protected Scheduler()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java [212:310]


    protected Scheduler(@NonNull final CheckpointConfig checkpointConfig,
                        @NonNull final CoordinatorConfig coordinatorConfig,
                        @NonNull final LeaseManagementConfig leaseManagementConfig,
                        @NonNull final LifecycleConfig lifecycleConfig,
                        @NonNull final MetricsConfig metricsConfig,
                        @NonNull final ProcessorConfig processorConfig,
                        @NonNull final RetrievalConfig retrievalConfig,
                        @NonNull final DiagnosticEventFactory diagnosticEventFactory) {
        this.checkpointConfig = checkpointConfig;
        this.coordinatorConfig = coordinatorConfig;
        this.leaseManagementConfig = leaseManagementConfig;
        this.lifecycleConfig = lifecycleConfig;
        this.metricsConfig = metricsConfig;
        this.processorConfig = processorConfig;
        this.retrievalConfig = retrievalConfig;

        this.applicationName = this.coordinatorConfig.applicationName();
        this.isMultiStreamMode = this.retrievalConfig.appStreamTracker().map(
                multiStreamTracker -> true, streamConfig -> false);
        this.currentStreamConfigMap = this.retrievalConfig.appStreamTracker().map(
                multiStreamTracker -> {
                    this.multiStreamTracker = multiStreamTracker;
                    this.formerStreamsLeasesDeletionStrategy = multiStreamTracker.formerStreamsLeasesDeletionStrategy();
                    this.orphanedStreamInitialPositionInStream = multiStreamTracker.orphanedStreamInitialPositionInStream();
                    return multiStreamTracker.streamConfigList().stream()
                            .collect(Collectors.toMap(sc -> sc.streamIdentifier(), sc -> sc));
                },
                streamConfig ->
                        Collections.singletonMap(streamConfig.streamIdentifier(), streamConfig));
        this.maxInitializationAttempts = this.coordinatorConfig.maxInitializationAttempts();
        this.metricsFactory = this.metricsConfig.metricsFactory();
        // Determine leaseSerializer based on availability of MultiStreamTracker.
        final LeaseSerializer leaseSerializer = isMultiStreamMode ?
                new DynamoDBMultiStreamLeaseSerializer() :
                new DynamoDBLeaseSerializer();
        this.leaseCoordinator = this.leaseManagementConfig
                .leaseManagementFactory(leaseSerializer, isMultiStreamMode)
                .createLeaseCoordinator(this.metricsFactory);
        this.leaseRefresher = this.leaseCoordinator.leaseRefresher();

        //
        // TODO: Figure out what to do with lease manage <=> checkpoint relationship
        //
        this.checkpoint = this.checkpointConfig.checkpointFactory().createCheckpointer(this.leaseCoordinator,
                this.leaseRefresher);

        //
        // TODO: Move this configuration to lifecycle
        //
        this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis();
        this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis();
        this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
        this.diagnosticEventFactory = diagnosticEventFactory;
        this.diagnosticEventHandler = new DiagnosticEventLogger();
        this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig
                .leaseManagementFactory(leaseSerializer, isMultiStreamMode)
                .createShardSyncTaskManager(this.metricsFactory, streamConfig);
        this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
        this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
        this.skipShardSyncAtWorkerInitializationIfLeasesExist =
                this.coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist();
        if (coordinatorConfig.gracefulShutdownCoordinator() != null) {
            this.gracefulShutdownCoordinator = coordinatorConfig.gracefulShutdownCoordinator();
        } else {
            this.gracefulShutdownCoordinator = this.coordinatorConfig.coordinatorFactory()
                    .createGracefulShutdownCoordinator();
        }
        if (coordinatorConfig.workerStateChangeListener() != null) {
            this.workerStateChangeListener = coordinatorConfig.workerStateChangeListener();
        } else {
            this.workerStateChangeListener = this.coordinatorConfig.coordinatorFactory()
                    .createWorkerStateChangeListener();
        }
        this.leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseRefresher,
                Executors.newSingleThreadScheduledExecutor(),
                PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT);
        this.failoverTimeMillis = this.leaseManagementConfig.failoverTimeMillis();
        this.taskBackoffTimeMillis = this.lifecycleConfig.taskBackoffTimeMillis();
//        this.retryGetRecordsInSeconds = this.retrievalConfig.retryGetRecordsInSeconds();
//        this.maxGetRecordsThreadPool = this.retrievalConfig.maxGetRecordsThreadPool();
        this.listShardsBackoffTimeMillis = this.retrievalConfig.listShardsBackoffTimeInMillis();
        this.maxListShardsRetryAttempts = this.retrievalConfig.maxListShardsRetryAttempts();
        this.shardDetectorProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).shardDetector();
        this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
        this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
        this.hierarchicalShardSyncerProvider = streamConfig -> createOrGetShardSyncTaskManager(streamConfig).hierarchicalShardSyncer();
        this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
        this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
                leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
                shardSyncTaskManagerProvider, isMultiStreamMode, metricsFactory,
                leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(),
                leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold());
        this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
                .createLeaseCleanupManager(metricsFactory);
        this.schemaRegistryDecoder =
            this.retrievalConfig.glueSchemaRegistryDeserializer() == null ?
                null
                : new SchemaRegistryDecoder(this.retrievalConfig.glueSchemaRegistryDeserializer());
    }