in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java [123:226]
public void cronSchedule() {
// Perform periodic scheduling tasks
if (!this.graph.started() || this.graph.closed()) {
return;
}
// Handle tasks in NEW status
Iterator<HugeTask<Object>> news = queryTaskWithoutResultByStatus(
TaskStatus.NEW);
while (!this.closed.get() && news.hasNext()) {
HugeTask<?> newTask = news.next();
LOG.info("Try to start task({})@({}/{})", newTask.id(),
this.graphSpace, this.graphName);
if (!tryStartHugeTask(newTask)) {
// Task submission failed when the thread pool is full.
break;
}
}
// Handling tasks in RUNNING state
Iterator<HugeTask<Object>> runnings =
queryTaskWithoutResultByStatus(TaskStatus.RUNNING);
while (!this.closed.get() && runnings.hasNext()) {
HugeTask<?> running = runnings.next();
initTaskParams(running);
if (!isLockedTask(running.id().toString())) {
LOG.info("Try to update task({})@({}/{}) status" +
"(RUNNING->FAILED)", running.id(), this.graphSpace,
this.graphName);
if (updateStatusWithLock(running.id(), TaskStatus.RUNNING,
TaskStatus.FAILED)) {
runningTasks.remove(running.id());
} else {
LOG.warn("Update task({})@({}/{}) status" +
"(RUNNING->FAILED) failed",
running.id(), this.graphSpace, this.graphName);
}
}
}
// Handle tasks in FAILED/HANGING state
Iterator<HugeTask<Object>> faileds =
queryTaskWithoutResultByStatus(TaskStatus.FAILED);
while (!this.closed.get() && faileds.hasNext()) {
HugeTask<?> failed = faileds.next();
initTaskParams(failed);
if (failed.retries() < this.graph().option(CoreOptions.TASK_RETRY)) {
LOG.info("Try to update task({})@({}/{}) status(FAILED->NEW)",
failed.id(), this.graphSpace, this.graphName);
updateStatusWithLock(failed.id(), TaskStatus.FAILED,
TaskStatus.NEW);
}
}
// Handling tasks in CANCELLING state
Iterator<HugeTask<Object>> cancellings = queryTaskWithoutResultByStatus(
TaskStatus.CANCELLING);
while (!this.closed.get() && cancellings.hasNext()) {
Id cancellingId = cancellings.next().id();
if (runningTasks.containsKey(cancellingId)) {
HugeTask<?> cancelling = runningTasks.get(cancellingId);
initTaskParams(cancelling);
LOG.info("Try to cancel task({})@({}/{})",
cancelling.id(), this.graphSpace, this.graphName);
cancelling.cancel(true);
runningTasks.remove(cancellingId);
} else {
// Local no execution task, but the current task has no nodes executing.
if (!isLockedTask(cancellingId.toString())) {
updateStatusWithLock(cancellingId, TaskStatus.CANCELLING,
TaskStatus.CANCELLED);
}
}
}
// Handling tasks in DELETING status
Iterator<HugeTask<Object>> deletings = queryTaskWithoutResultByStatus(
TaskStatus.DELETING);
while (!this.closed.get() && deletings.hasNext()) {
Id deletingId = deletings.next().id();
if (runningTasks.containsKey(deletingId)) {
HugeTask<?> deleting = runningTasks.get(deletingId);
initTaskParams(deleting);
deleting.cancel(true);
// Delete storage information
deleteFromDB(deletingId);
runningTasks.remove(deletingId);
} else {
// Local has no task execution, but the current task has no nodes executing anymore.
if (!isLockedTask(deletingId.toString())) {
deleteFromDB(deletingId);
}
}
}
}