in nodemanager/core/RemoteExecutor.cpp [284:415]
pplx::task<json::value> RemoteExecutor::EndJob(hpc::arguments::EndJobArgs&& args)
{
WriterLock writerLock(&this->lock);
Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: starting");
auto jobInfo = this->jobTaskTable.RemoveJob(args.JobId);
json::value jsonBody;
if (jobInfo)
{
for (auto& taskPair : jobInfo->Tasks)
{
auto taskInfo = taskPair.second;
if (taskInfo)
{
const auto* stat = this->TerminateTask(
args.JobId, taskPair.first, taskInfo->GetTaskRequeueCount(),
taskInfo->ProcessKey, (int)ErrorCodes::EndJobExitCode, true, !taskInfo->IsPrimaryTask);
Logger::Debug(args.JobId, taskPair.first, taskInfo->GetTaskRequeueCount(), "EndJob: Terminating task");
if (stat != nullptr)
{
taskInfo->Exited = stat->IsTerminated();
taskInfo->ExitCode = (int)ErrorCodes::EndJobExitCode;
taskInfo->AssignFromStat(*stat);
taskInfo->CancelGracefulThread();
}
}
else
{
Logger::Warn(args.JobId, taskPair.first, this->UnknowId,
"EndJob: Task is already finished");
assert(false);
}
}
jsonBody = jobInfo->ToJson();
Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: ended {0}", jsonBody);
}
else
{
Logger::Warn(args.JobId, this->UnknowId, this->UnknowId, "EndJob: Job is already finished");
}
auto jobUser = this->jobUsers.find(args.JobId);
if (jobUser != this->jobUsers.end())
{
Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: Cleanup user {0}", std::get<0>(jobUser->second));
auto userJob = this->userJobs.find(std::get<0>(jobUser->second));
bool cleanupUser = false;
if (userJob == this->userJobs.end())
{
cleanupUser = true;
}
else
{
userJob->second.erase(args.JobId);
// cleanup when no one is using the user;
cleanupUser = userJob->second.empty();
Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
"EndJob: {0} jobs associated with the user {1}", userJob->second.size(), std::get<0>(jobUser->second));
if (cleanupUser)
{
this->userJobs.erase(userJob);
}
}
if (cleanupUser)
{
std::string userName, publicKey;
bool existed, privateKeyAdded, publicKeyAdded, authKeyAdded;
std::tie(userName, existed, privateKeyAdded, publicKeyAdded, authKeyAdded, publicKey) = jobUser->second;
// the existed could be true for the later job, so the user will be left
// on the node, which is by design.
// we just have this delete user logic for a simple way of cleanup.
// if delete user failed, cleanup keys as necessary.
bool cleanupKeys = true;
// if (!existed)
// {
// if (!userName.empty())
// {
// Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
// "EndJob: Delete user {0}", userName);
//
// cleanupKeys = 0 != System::DeleteUser(userName);
// }
// }
if (cleanupKeys)
{
if (privateKeyAdded)
{
Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
"EndJob: RemoveSshKey id_rsa: {0}", userName);
System::RemoveSshKey(userName, "id_rsa");
}
if (publicKeyAdded)
{
Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
"EndJob: RemoveSshKey id_rsa.pub: {0}", userName);
System::RemoveSshKey(userName, "id_rsa.pub");
}
if (authKeyAdded)
{
Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
"EndJob: RemoveAuthorizedKey {0}", userName);
System::RemoveAuthorizedKey(userName, publicKey);
}
}
}
this->jobUsers.erase(jobUser);
}
return pplx::task_from_result(jsonBody);
}