in amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java [338:396]
void initialize() {
synchronized (lock) {
registerErrorHandlerForUndeliverableAsyncTaskExceptions();
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
boolean isDone = false;
Exception lastException = null;
for (int i = 0; (!isDone) && (i < maxInitializationAttempts); i++) {
try {
log.info("Initialization attempt {}", (i + 1));
log.info("Initializing LeaseCoordinator");
leaseCoordinator.initialize();
TaskResult result;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
if (shouldInitiateLeaseSync()) {
log.info("Worker {} is initiating the lease sync.", leaseManagementConfig.workerIdentifier());
leaderElectedPeriodicShardSyncManager.syncShardsOnce();
}
} else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
}
leaseCleanupManager.start();
// If we reach this point, then we either skipped the lease sync or did not have any exception
// for any of the shard sync in the previous attempt.
if (!leaseCoordinator.isRunning()) {
log.info("Starting LeaseCoordinator");
leaseCoordinator.start();
} else {
log.info("LeaseCoordinator is already running. No need to start it.");
}
log.info("Scheduling periodicShardSync");
leaderElectedPeriodicShardSyncManager.start();
streamSyncWatch.start();
isDone = true;
} catch (Exception e) {
log.error("Caught exception when initializing LeaseCoordinator", e);
lastException = e;
}
if (!isDone) {
try {
Thread.sleep(schedulerInitializationBackoffTimeMillis);
leaderElectedPeriodicShardSyncManager.stop();
} catch (InterruptedException e) {
log.debug("Sleep interrupted while initializing worker.");
}
}
}
if (!isDone) {
throw new RuntimeException(lastException);
}
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
}
}