public void cronSchedule()

in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java [123:226]


    public void cronSchedule() {
        // Perform periodic scheduling tasks

        if (!this.graph.started() || this.graph.closed()) {
            return;
        }

        // Handle tasks in NEW status
        Iterator<HugeTask<Object>> news = queryTaskWithoutResultByStatus(
            TaskStatus.NEW);

        while (!this.closed.get() && news.hasNext()) {
            HugeTask<?> newTask = news.next();
            LOG.info("Try to start task({})@({}/{})", newTask.id(),
                     this.graphSpace, this.graphName);
            if (!tryStartHugeTask(newTask)) {
                // Task submission failed when the thread pool is full.
                break;
            }
        }

        // Handling tasks in RUNNING state
        Iterator<HugeTask<Object>> runnings =
            queryTaskWithoutResultByStatus(TaskStatus.RUNNING);

        while (!this.closed.get() && runnings.hasNext()) {
            HugeTask<?> running = runnings.next();
            initTaskParams(running);
            if (!isLockedTask(running.id().toString())) {
                LOG.info("Try to update task({})@({}/{}) status" +
                         "(RUNNING->FAILED)", running.id(), this.graphSpace,
                         this.graphName);
                if (updateStatusWithLock(running.id(), TaskStatus.RUNNING,
                                         TaskStatus.FAILED)) {
                    runningTasks.remove(running.id());
                } else {
                    LOG.warn("Update task({})@({}/{}) status" +
                             "(RUNNING->FAILED) failed",
                             running.id(), this.graphSpace, this.graphName);
                }
            }
        }

        // Handle tasks in FAILED/HANGING state
        Iterator<HugeTask<Object>> faileds =
            queryTaskWithoutResultByStatus(TaskStatus.FAILED);

        while (!this.closed.get() && faileds.hasNext()) {
            HugeTask<?> failed = faileds.next();
            initTaskParams(failed);
            if (failed.retries() < this.graph().option(CoreOptions.TASK_RETRY)) {
                LOG.info("Try to update task({})@({}/{}) status(FAILED->NEW)",
                         failed.id(), this.graphSpace, this.graphName);
                updateStatusWithLock(failed.id(), TaskStatus.FAILED,
                                     TaskStatus.NEW);
            }
        }

        // Handling tasks in CANCELLING state
        Iterator<HugeTask<Object>> cancellings = queryTaskWithoutResultByStatus(
            TaskStatus.CANCELLING);

        while (!this.closed.get() && cancellings.hasNext()) {
            Id cancellingId = cancellings.next().id();
            if (runningTasks.containsKey(cancellingId)) {
                HugeTask<?> cancelling = runningTasks.get(cancellingId);
                initTaskParams(cancelling);
                LOG.info("Try to cancel task({})@({}/{})",
                         cancelling.id(), this.graphSpace, this.graphName);
                cancelling.cancel(true);

                runningTasks.remove(cancellingId);
            } else {
                // Local no execution task, but the current task has no nodes executing.
                if (!isLockedTask(cancellingId.toString())) {
                    updateStatusWithLock(cancellingId, TaskStatus.CANCELLING,
                                         TaskStatus.CANCELLED);
                }
            }
        }

        // Handling tasks in DELETING status
        Iterator<HugeTask<Object>> deletings = queryTaskWithoutResultByStatus(
            TaskStatus.DELETING);

        while (!this.closed.get() && deletings.hasNext()) {
            Id deletingId = deletings.next().id();
            if (runningTasks.containsKey(deletingId)) {
                HugeTask<?> deleting = runningTasks.get(deletingId);
                initTaskParams(deleting);
                deleting.cancel(true);

                // Delete storage information
                deleteFromDB(deletingId);

                runningTasks.remove(deletingId);
            } else {
                // Local has no task execution, but the current task has no nodes executing anymore.
                if (!isLockedTask(deletingId.toString())) {
                    deleteFromDB(deletingId);
                }
            }
        }
    }