in myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java [50:82]
public void reconcile(SchedulerDriver driver) {
Collection<Protos.TaskStatus> taskStatuses = state.getTaskStatuses();
if (taskStatuses.size() == 0) {
return;
}
LOGGER.info("Reconciling {} tasks.", taskStatuses.size());
driver.reconcileTasks(taskStatuses);
lastReconcileTime = new Date();
int attempt = 1;
while (attempt <= MAX_RECONCILE_ATTEMPTS) {
try {
// TODO(mohit): Using exponential backoff here, maybe backoff strategy should be configurable.
Thread.sleep(DEFAULT_RECONCILATION_DELAY_MS * attempt);
} catch (InterruptedException e) {
LOGGER.error("Interrupted", e);
}
Collection<Protos.TaskStatus> notYetReconciled = new ArrayList<>();
for (Protos.TaskStatus status : state.getTaskStatuses()) {
if (status.getTimestamp() < lastReconcileTime.getTime()) {
notYetReconciled.add(status);
}
}
LOGGER.info("Reconcile attempt {} for {} tasks", attempt, notYetReconciled.size());
driver.reconcileTasks(notYetReconciled);
lastReconcileTime = new Date();
attempt++;
}
}