in nodemanager/core/RemoteExecutor.cpp [417:476]
pplx::task<json::value> RemoteExecutor::EndTask(hpc::arguments::EndTaskArgs&& args, std::string&& callbackUri)
{
ReaderLock readerLock(&this->lock);
Logger::Info(args.JobId, args.TaskId, this->UnknowId, "EndTask: starting");
auto taskInfo = this->jobTaskTable.GetTask(args.JobId, args.TaskId);
json::value jsonBody;
if (taskInfo)
{
Logger::Debug(
args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(),
"EndTask for ProcessKey {0}, processes count {1}",
taskInfo->ProcessKey, this->processes.size());
const auto* stat = this->TerminateTask(
args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(),
taskInfo->ProcessKey,
(int)ErrorCodes::EndTaskExitCode,
args.TaskCancelGracePeriodSeconds == 0,
!taskInfo->IsPrimaryTask);
taskInfo->ExitCode = (int)ErrorCodes::EndTaskExitCode;
if (stat == nullptr || stat->IsTerminated())
{
this->jobTaskTable.RemoveTask(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetAttemptId());
taskInfo->Exited = true;
taskInfo->CancelGracefulThread();
if (stat != nullptr)
{
taskInfo->AssignFromStat(*stat);
}
}
else
{
taskInfo->Exited = false;
taskInfo->AssignFromStat(*stat);
// start a thread to kill the task after a period of time;
auto* taskIds = new std::tuple<int, int, int, uint64_t, std::string, int, RemoteExecutor*>(
taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(), taskInfo->ProcessKey,
callbackUri, args.TaskCancelGracePeriodSeconds, this);
pthread_create(&taskInfo->GracefulThreadId, nullptr, RemoteExecutor::GracePeriodElapsed, taskIds);
}
jsonBody = taskInfo->ToJson();
Logger::Info(args.JobId, args.TaskId, this->UnknowId, "EndTask: ended {0}", jsonBody);
}
else
{
Logger::Warn(args.JobId, args.TaskId, this->UnknowId, "EndTask: Task is already finished");
}
return pplx::task_from_result(jsonBody);
}