void initialize()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java [338:396]


    void initialize() {
        synchronized (lock) {
            registerErrorHandlerForUndeliverableAsyncTaskExceptions();
            workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
            boolean isDone = false;
            Exception lastException = null;

            for (int i = 0; (!isDone) && (i < maxInitializationAttempts); i++) {
                try {
                    log.info("Initialization attempt {}", (i + 1));
                    log.info("Initializing LeaseCoordinator");
                    leaseCoordinator.initialize();

                    TaskResult result;
                    if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
                        if (shouldInitiateLeaseSync()) {
                            log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier());
                            leaderElectedPeriodicShardSyncManager.syncShardsOnce();

                        }
                    } else {
                        log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
                    }

                    leaseCleanupManager.start();

                    // If we reach this point, then we either skipped the lease sync or did not have any exception
                    // for any of the shard sync in the previous attempt.
                    if (!leaseCoordinator.isRunning()) {
                        log.info("Starting LeaseCoordinator");
                        leaseCoordinator.start();
                    } else {
                        log.info("LeaseCoordinator is already running. No need to start it.");
                    }
                    log.info("Scheduling periodicShardSync");
                    leaderElectedPeriodicShardSyncManager.start();
                    streamSyncWatch.start();
                    isDone = true;
                } catch (Exception e) {
                    log.error("Caught exception when initializing LeaseCoordinator", e);
                    lastException = e;
                }

                if (!isDone) {
                    try {
                        Thread.sleep(schedulerInitializationBackoffTimeMillis);
                        leaderElectedPeriodicShardSyncManager.stop();
                    } catch (InterruptedException e) {
                        log.debug("Sleep interrupted while initializing worker.");
                    }
                }
            }

            if (!isDone) {
                throw new RuntimeException(lastException);
            }
            workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
        }
    }