pplx::task ExecutionFilter::ExecuteFilter()

in nodemanager/filters/ExecutionFilter.cpp [29:159]


pplx::task<json::value> ExecutionFilter::ExecuteFilter(const std::string& filterType, int jobId, int taskId, int requeueCount, const json::value& input) const
{
    auto filterIt = this->filterFiles.find(filterType);
    if (filterIt == this->filterFiles.end())
    {
        Logger::Error(jobId, taskId, requeueCount, "Unknown filter type {0}", filterType);
        throw std::runtime_error(String::Join(" ", "Unknown filter type", filterType));
    }

    std::string filterFile = filterIt->second;

    std::ifstream test(filterFile);
    if (!test.good())
    {
        Logger::Info(jobId, taskId, requeueCount, "{0} not detected, skip", filterFile);
        return pplx::task_from_result(input);
    }

    if (filterFile[0] != '/')
    {
        std::string pwd;
        System::ExecuteCommandOut(pwd, "pwd | tr -d '\n'");
        filterFile = pwd + "/" + filterFile;
    }

//    std::string tt;
//    System::ExecuteCommandOut(tt, "cat < nodemanager.json");
//    Logger::Debug(">>>>>>>>>>>>>>>>>>>>>>>>>>>>> {0}", tt);

    char folder[256];
    sprintf(folder, "/dev/shm/nodemanager_executionfilter_%d_%d_%d.XXXXXX", jobId, taskId, requeueCount);
    int ret = System::CreateTempFolder(folder, "root");

    if (ret != 0)
    {
        Logger::Error(jobId, taskId, requeueCount, "{0} {1}: Failed to create folder {2}, exit code {3}", filterType, filterFile, folder, ret);
        throw std::runtime_error(String::Join("", filterType, " ", filterFile, ": Failed to create folder ", folder, ", ret ", ret));
    }

    std::string folderString = folder;

#ifndef DEBUG // In Release build, we need to clean up the folder which may contains user credential information in any case
    try {
#endif // DEBUG

    std::string stdinFile = folderString + "/stdin.txt";
    ret = System::WriteStringToFile(stdinFile, input.serialize());

    if (ret != 0)
    {
        Logger::Error(jobId, taskId, requeueCount, "{0} {1}: Failed to create stdin file {2}, exit code {3}", filterType, filterFile, stdinFile, ret);
        throw std::runtime_error(String::Join("", filterType, " ", filterFile, ": Failed to create stdin file ", stdinFile, ", exit code ", ret));
    }

    std::string stdoutFile = folderString + "/stdout.txt";
    std::string stderrFile = stdoutFile;

    std::shared_ptr<Process> p = std::make_shared<Process>(
        jobId, taskId, requeueCount, "Filter", filterFile, stdoutFile, stderrFile, stdinFile, folderString, "root", false,
        std::vector<uint64_t>(), std::map<std::string, std::string>(),
        [=] (int exitCode, std::string&& message, const ProcessStatistics& stat)
        {
        });

    p->Start(p).then([=] (std::pair<pid_t, pthread_t> ids)
    {
        Logger::Info(jobId, taskId, requeueCount, "{0} {1}: pid {2} tid {3}", filterType, filterFile, ids.first, ids.second);
    });

    return p->OnCompleted().then([=] (pplx::task<void> t)
    {

#ifndef DEBUG        
        try {
#endif // DEBUG
          
        int ret = p->GetExitCode();
        std::string executionMessage = p->GetExecutionMessage();
        t.get();

        if (0 == ret)
        {
            std::ifstream fsStdout(stdoutFile, std::ios::in);

            json::value output;
            if (fsStdout)
            {
                std::string content((std::istreambuf_iterator<char>(fsStdout)), std::istreambuf_iterator<char>());
                Logger::Info(jobId, taskId, requeueCount, "{0} {1}: plugin output read", filterType, filterFile);
                output = json::value::parse(content);
                fsStdout.close();

                // In Debug build, only clean up the folder when success.
                std::string temp;
                System::ExecuteCommandOut(temp, "rm -rf", folderString);
                return output;
            }
            else
            {
                throw std::runtime_error(String::Join("", filterType, " ", filterFile, ": Unable to read stdout file ", stdoutFile, ", exit code ", (int)ErrorCodes::ReadFileError));
            }
        }
        else
        {
            throw FilterException(ret, String::Join("", filterType, " ", filterFile, ": Filter returned exit code ", ret, ", execution message ", executionMessage));            
        }

#ifndef DEBUG        
        }
        catch (...)
        {
            std::string temp;
            System::ExecuteCommandOut(temp, "rm -rf", folderString);
            throw;
        }
#endif // DEBUG

    })
    .then([=] (pplx::task<json::value> t) mutable -> json::value { p.reset(); return t.get(); } );

#ifndef DEBUG        
    }
    catch (...)
    {
        std::string temp;
        System::ExecuteCommandOut(temp, "rm -rf", folderString);
        throw;        
    }    
#endif // DEBUG

}