pplx::task RemoteExecutor::EndTask()

in nodemanager/core/RemoteExecutor.cpp [417:476]


pplx::task<json::value> RemoteExecutor::EndTask(hpc::arguments::EndTaskArgs&& args, std::string&& callbackUri)
{
    ReaderLock readerLock(&this->lock);
    Logger::Info(args.JobId, args.TaskId, this->UnknowId, "EndTask: starting");

    auto taskInfo = this->jobTaskTable.GetTask(args.JobId, args.TaskId);

    json::value jsonBody;

    if (taskInfo)
    {
        Logger::Debug(
            args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(),
            "EndTask for ProcessKey {0}, processes count {1}",
            taskInfo->ProcessKey, this->processes.size());

        const auto* stat = this->TerminateTask(
            args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(),
            taskInfo->ProcessKey,
            (int)ErrorCodes::EndTaskExitCode,
            args.TaskCancelGracePeriodSeconds == 0,
            !taskInfo->IsPrimaryTask);

        taskInfo->ExitCode = (int)ErrorCodes::EndTaskExitCode;

        if (stat == nullptr || stat->IsTerminated())
        {
            this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId());

            taskInfo->Exited = true;
            taskInfo->CancelGracefulThread();

            if (stat != nullptr)
            {
                taskInfo->AssignFromStat(*stat);
            }
        }
        else
        {
            taskInfo->Exited = false;
            taskInfo->AssignFromStat(*stat);

            // start a thread to kill the task after a period of time;
            auto* taskIds = new std::tuple<int, int, int, uint64_t, std::string, int, RemoteExecutor*>(
                taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(), taskInfo->ProcessKey,
                callbackUri, args.TaskCancelGracePeriodSeconds, this);

            pthread_create(&taskInfo->GracefulThreadId, nullptr, RemoteExecutor::GracePeriodElapsed, taskIds);
        }

        jsonBody = taskInfo->ToJson();
        Logger::Info(args.JobId, args.TaskId, this->UnknowId, "EndTask: ended {0}", jsonBody);
    }
    else
    {
        Logger::Warn(args.JobId, args.TaskId, this->UnknowId, "EndTask: Task is already finished");
    }

    return pplx::task_from_result(jsonBody);
}