pplx::task RemoteExecutor::EndJob()

in nodemanager/core/RemoteExecutor.cpp [284:415]


pplx::task<json::value> RemoteExecutor::EndJob(hpc::arguments::EndJobArgs&& args)
{
    WriterLock writerLock(&this->lock);

    Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: starting");

    auto jobInfo = this->jobTaskTable.RemoveJob(args.JobId);

    json::value jsonBody;

    if (jobInfo)
    {
        for (auto& taskPair : jobInfo->Tasks)
        {
            auto taskInfo = taskPair.second;

            if (taskInfo)
            {
                const auto* stat = this->TerminateTask(
                    args.JobId, taskPair.first, taskInfo->GetTaskRequeueCount(),
                    taskInfo->ProcessKey, (int)ErrorCodes::EndJobExitCode, true, !taskInfo->IsPrimaryTask);
                Logger::Debug(args.JobId, taskPair.first, taskInfo->GetTaskRequeueCount(), "EndJob: Terminating task");
                if (stat != nullptr)
                {
                    taskInfo->Exited = stat->IsTerminated();
                    taskInfo->ExitCode = (int)ErrorCodes::EndJobExitCode;
                    taskInfo->AssignFromStat(*stat);
                    taskInfo->CancelGracefulThread();
                }
            }
            else
            {
                Logger::Warn(args.JobId, taskPair.first, this->UnknowId,
                    "EndJob: Task is already finished");

                assert(false);
            }
        }

        jsonBody = jobInfo->ToJson();
        Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: ended {0}", jsonBody);
    }
    else
    {
        Logger::Warn(args.JobId, this->UnknowId, this->UnknowId, "EndJob: Job is already finished");
    }

    auto jobUser = this->jobUsers.find(args.JobId);

    if (jobUser != this->jobUsers.end())
    {
        Logger::Info(args.JobId, this->UnknowId, this->UnknowId, "EndJob: Cleanup user {0}", std::get<0>(jobUser->second));
        auto userJob = this->userJobs.find(std::get<0>(jobUser->second));

        bool cleanupUser = false;
        if (userJob == this->userJobs.end())
        {
            cleanupUser = true;
        }
        else
        {
            userJob->second.erase(args.JobId);

            // cleanup when no one is using the user;
            cleanupUser = userJob->second.empty();
            Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
                "EndJob: {0} jobs associated with the user {1}", userJob->second.size(), std::get<0>(jobUser->second));

            if (cleanupUser)
            {
                this->userJobs.erase(userJob);
            }
        }

        if (cleanupUser)
        {
            std::string userName, publicKey;
            bool existed, privateKeyAdded, publicKeyAdded, authKeyAdded;

            std::tie(userName, existed, privateKeyAdded, publicKeyAdded, authKeyAdded, publicKey) = jobUser->second;

            // the existed could be true for the later job, so the user will be left
            // on the node, which is by design.
            // we just have this delete user logic for a simple way of cleanup.
            // if delete user failed, cleanup keys as necessary.

            bool cleanupKeys = true;

//            if (!existed)
//            {
//                if (!userName.empty())
//                {
//                    Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
//                        "EndJob: Delete user {0}", userName);
//
//                    cleanupKeys = 0 != System::DeleteUser(userName);
//                }
//            }

            if (cleanupKeys)
            {
                if (privateKeyAdded)
                {
                    Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
                        "EndJob: RemoveSshKey id_rsa: {0}", userName);

                    System::RemoveSshKey(userName, "id_rsa");
                }

                if (publicKeyAdded)
                {
                    Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
                        "EndJob: RemoveSshKey id_rsa.pub: {0}", userName);

                    System::RemoveSshKey(userName, "id_rsa.pub");
                }

                if (authKeyAdded)
                {
                    Logger::Info(args.JobId, this->UnknowId, this->UnknowId,
                        "EndJob: RemoveAuthorizedKey {0}", userName);

                    System::RemoveAuthorizedKey(userName, publicKey);
                }
            }
        }

        this->jobUsers.erase(jobUser);
    }

    return pplx::task_from_result(jsonBody);
}