void RequestHandler::putFileGetData()

in host/cxpslib/requesthandler.cpp [884:1147]


void RequestHandler::putFileGetData()
{
    SCOPE_GUARD sessionLogoutGuard = MAKE_SCOPE_GUARD(boost::bind(&RequestHandler::sessionLogout, this));
    SCOPE_GUARD resetGuard = MAKE_SCOPE_GUARD(boost::bind(&RequestHandler::reset, this));

    tagValue_t::iterator moreDataTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_MORE_DATA));
    if (m_requestInfo.m_params.end() == moreDataTagValue) {
        m_requestTelemetryData.SetRequestFailure(RequestFailure_PutFileMissingMoreData);
        badRequest(AT_LOC, "missing moredata");
        return;
    }
    m_putFileInfo.m_moreData = ('1' == (*moreDataTagValue).second[0]);

    tagValue_t::iterator createDirsTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_CREATE_DIRS));
    if (m_requestInfo.m_params.end() == createDirsTagValue) {
        m_putFileInfo.m_createDirs = false;
    } else {
        m_putFileInfo.m_createDirs = ('1' == (*createDirsTagValue).second[0]);
    }

    tagValue_t::iterator fileNameTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_NAME));
    if (m_requestInfo.m_params.end() == fileNameTagValue) {
        m_requestTelemetryData.SetRequestFailure(RequestFailure_MissingFileName);
        badRequest(AT_LOC, "missing name");
        return;
    }

    std::string& fileName = (*fileNameTagValue).second;
    // Defering the update the file name to telemetry  as soon as parsed. If the request telemetry
    // data has been pended from the last putfile(moredata=true), then overwriting the file name here
    // and on failing with PutfileInProgress would end up uploading the data for the new file.

    std::string ver;
    if (!getVersion(ver)) {
        m_requestTelemetryData.SetRequestFailure(RequestFailure_MissingVer);
        badRequest(AT_LOC, "missing ver");
        return;
    }

    tagValue_t::iterator idTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_ID));
    if (m_requestInfo.m_params.end() == idTagValue) {
        m_requestTelemetryData.SetRequestFailure(RequestFailure_MissingId);
        badRequest(AT_LOC, "missing id");
        return;
    }

    tagValue_t::iterator reqIdTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_REQ_ID));
    if (m_requestInfo.m_params.end() == reqIdTagValue) {
        m_requestTelemetryData.SetRequestFailure(RequestFailure_MissingReqId);
        badRequest(AT_LOC, "missing reqid");
        return;
    }

    boost::uint32_t reqId = boost::lexical_cast<boost::uint32_t>((*reqIdTagValue).second);
    if (reqId <= m_reqId) {
        m_requestTelemetryData.SetRequestFailure(RequestFailure_InvalidReqId);
        badRequest(AT_LOC, "invalid request id");
        return;
    }

    bool enableDiffResyncThrottle = true;
    tagValue_t::iterator filetypeTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_FILETYPE));
    if (m_requestInfo.m_params.end() == filetypeTagValue) {
        //CXPS_LOG_WARNING("filetype not sent. Diff and resync throttle will not work");
        enableDiffResyncThrottle = false;
    }
    else if(filetypeTagValue->second.empty())
    {
        //CXPS_LOG_WARNING("invalid filetype");
        enableDiffResyncThrottle = false;
    }
    else
    {
        m_putFileInfo.m_filetype = (CxpsTelemetry::FileType)boost::lexical_cast<int>(filetypeTagValue->second);
        //CXPS_LOG_ERROR("Filetype : " << m_putFileInfo.m_filetype);
    }

    tagValue_t::iterator diskidTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_DISKID));
    if (m_requestInfo.m_params.end() == diskidTagValue) {
        //CXPS_LOG_WARNING("diskid not sent. Diff and resync throttle will not work");
        enableDiffResyncThrottle = false;
    }
    else if (diskidTagValue->second.empty())
    {
        //CXPS_LOG_WARNING("invalid diskid");
        enableDiffResyncThrottle = false;
    }
    else
    {
        m_putFileInfo.m_deviceId = diskidTagValue->second;
        boost::algorithm::to_lower(m_putFileInfo.m_deviceId);
        //CXPS_LOG_ERROR("diskid : " << m_putFileInfo.m_deviceId);
    }

    if (!Authentication::verifyPutFileId(m_hostId,
                                         m_serverOptions->password(),
                                         HTTP_METHOD_POST,
                                         HTTP_REQUEST_PUTFILE,
                                         m_cnonce,
                                         m_sessionId,
                                         m_snonce,
                                         fileName,
                                         (m_putFileInfo.m_moreData ? '1' : '0'),
                                         ver,
                                         reqId,
                                         (*idTagValue).second)) {
        m_requestTelemetryData.SetRequestFailure(RequestFailure_VerifPutFile);

        std::string msg("invalid id for host: ");
        msg += m_hostId;
        msg += " request: ";
        msg += HTTP_REQUEST_PUTFILE;
        badRequest(AT_LOC, msg.c_str());
        return;
    }
    m_reqId = reqId;

    std::replace(fileName.begin(), fileName.end(), '\\', '/');

    boost::filesystem::path fullPathName;
    getFullPathName(fileName, fullPathName);

    if (!m_putFileInfo.m_name.empty() && fullPathName != m_putFileInfo.m_name) {
        std::stringstream errStr;
        errStr << "putfile request for " << fullPathName
               << " but current putfile indicates " << m_putFileInfo.m_name << " is being processed";
        if (m_putFileInfo.m_moreData) {
            // if this throws, then it is most likely request error
            if (putFileInProgress(errStr.str().c_str())) {
                return;
            }
        }

        m_requestTelemetryData.SetRequestFailure(RequestFailure_PutFileExpectedMoreData);
        // VERIFY: if we get here, then it means there is no more data for the current
        // putfile, but some how still have info about the last putfile. this should not happen
        // so for now treat it like an error. we could just assume a reset was missed
        // do the reset and continue with this request
        throw ERROR_EXCEPTION << errStr.str() << " but more data is false";
    }
    m_requestTelemetryData.AcquiredFilePath(fileName);

    // create the paths before checking throttle
    if (!fullPathName.empty() && (m_putFileInfo.m_createDirs || m_serverOptions->createPaths())) {
        CreatePaths::createPathsAsNeeded(fullPathName);
    }

    // Throttling should work only for diff and resync files, and not for other files
    // To prevent other types of files from being throttled, we should only throttle files with extension .dat
    // To do: sadewang-1912
    // Change this logic to identify file types based on headers and not extension
    if (!fullPathName.empty() && fullPathName.has_extension() && boost::iequals(fullPathName.extension().string(), ".dat"))
    {
        checkForThrottle(enableDiffResyncThrottle, fullPathName);
        if (m_putFileInfo.m_isCumulativeThrottled || m_putFileInfo.m_isDiffThrottled || m_putFileInfo.m_isResyncThrottled)
        {
            setThrottleRequestFailureInTelemetry();
            m_putFileInfo.m_bytesLeft = m_putFileInfo.m_totalBytesLeftToRead;
            resetGuard.dismiss();
            sessionLogoutGuard.dismiss();
            if (m_putFileInfo.m_bytesLeft > 0)
            {
                ReadEntireDataFromSocketAsync();
            }
            else
            {
                ReadEntireDataAndSendThrottle();
            }
            return;
        }
    }

    if (m_putFileInfo.m_fio.is_open()) {
        m_putFileInfo.m_openFileIsNeeded = false;
    } else {
        m_putFileInfo.m_openFileIsNeeded = true;
        m_putFileInfo.m_sentName = fileName;
        {
            // Acquire this mutex before changing m_putFileInfo.m_name
            boost::mutex::scoped_lock guard(m_interSessionCommunicationMutex);
            m_putFileInfo.m_name = fullPathName;
        }

        // CHECK: is the good enough?
        extendedLengthPath_t extName(ExtendedLengthPath::name(m_putFileInfo.m_name.string()));
        if (boost::filesystem::exists(extName)) {
            std::string sessionId = g_sessionTracker->checkOpenFile(fullPathName, false, true);
            if (!sessionId.empty()) {
                m_requestTelemetryData.SetRequestFailure(RequestFailure_FileOpenInSession);

                m_putFileInfo.m_name.clear(); // this prevents this session from deleting the putfile while another session is using it
                throw ERROR_EXCEPTION << "(sid: " << m_sessionId << ") putfile "
                                      << m_putFileInfo.m_name.string() << " currently opened by session "
                                      << sessionId << " can not be opened by multiple sessions";
            }

            CXPS_LOG_WARNING(AT_LOC << "(sid: " << m_sessionId << ") putfile " << m_putFileInfo.m_name.string()
                             << " already exists. it will be overwritten");
        }

        try {
            if (!m_putFileInfo.m_fio.open(extName.string().c_str(), FIO::FIO_OVERWRITE)) {
                m_requestTelemetryData.SetRequestFailure(RequestFailure_FileOpenFailed);
                throw ERROR_EXCEPTION << "error opening file " << m_putFileInfo.m_name.string() << ": " << m_putFileInfo.m_fio.errorAsString();
            }
        } catch (std::exception const& e) {
            m_requestTelemetryData.SetRequestFailure(RequestFailure_FileOpenFailed);
            throw ERROR_EXCEPTION << "error opening file " << m_putFileInfo.m_name.string() << ": " << e.what();
        }

        m_putFileInfo.m_compress = compressFile(m_putFileInfo.m_name);
        if (m_putFileInfo.m_compress) {
            m_putFileInfo.m_zFlate.reset(new Zflate(Zflate::COMPRESS));
        }
    }

    if (FIO::FIO_SUCCESS != m_putFileInfo.m_fio.error()) {
        m_requestTelemetryData.SetRequestFailure(RequestFailure_FileBadState);
        throw ERROR_EXCEPTION << "file is open but in a bad state " << m_putFileInfo.m_name.string() << ": " << m_putFileInfo.m_fio.errorAsString();
    }

    m_putFileInfo.m_bytesLeft = m_putFileInfo.m_totalBytesLeftToRead;

    tagValue_t::iterator offsetTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_OFFSET));
    if (m_requestInfo.m_params.end() != offsetTagValue) {
        try {
            FIO::offset_t offset = boost::lexical_cast<FIO::offset_t>((*offsetTagValue).second);
            m_putFileInfo.m_fio.seek(offset, SEEK_SET);
        } catch (std::exception const& e) {
            m_requestTelemetryData.SetRequestFailure(RequestFailure_FileInvalidOffset);
            throw ERROR_EXCEPTION << "invalid offset " << (*offsetTagValue).second << ": " << e.what();
        }
    }

    // need to write any remaining bytes in the buffer
    // to the file before starting the async handling
    if (m_putFileInfo.m_idx < m_requestInfo.m_bufferLen) {
        if (writePutFileData(&m_putFileInfo.m_buffer[m_putFileInfo.m_idx], m_requestInfo.m_bufferLen - m_putFileInfo.m_idx) < 0) {
            m_requestTelemetryData.SetRequestFailure(RequestFailure_FileWriteFailed);
            throw ERROR_EXCEPTION << m_putFileInfo.m_name.string() << " write file error: " << m_putFileInfo.m_fio.errorAsString();
        }
    }

    logRequestProcessing();
    if (m_putFileInfo.m_totalBytesLeftToRead > 0) {
        resetGuard.dismiss();
        sessionLogoutGuard.dismiss();
        asyncPutFile();
    } else {
        // have all the data no need for async processing just end
        char buffer[1]; // needed for compression
        if (writePutFileData(buffer, 0) < 0) {
            m_requestTelemetryData.SetRequestFailure(RequestFailure_FileWriteFailed);
            throw ERROR_EXCEPTION << m_putFileInfo.m_name.string() << " write file error: " << m_putFileInfo.m_fio.errorAsString();
        }

        if (!m_putFileInfo.m_fio.is_open() || 0 == m_putFileInfo.m_fio.tell()) {
            CXPS_LOG_WARNING(AT_LOC << "0 size file detected: " << m_putFileInfo.m_name.string());
        }
        resetGuard.dismiss();
        sessionLogoutGuard.dismiss();
        putFileEnd();
    }
}