in host/cxpslib/requesthandler.cpp [884:1147]
void RequestHandler::putFileGetData()
{
SCOPE_GUARD sessionLogoutGuard = MAKE_SCOPE_GUARD(boost::bind(&RequestHandler::sessionLogout, this));
SCOPE_GUARD resetGuard = MAKE_SCOPE_GUARD(boost::bind(&RequestHandler::reset, this));
tagValue_t::iterator moreDataTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_MORE_DATA));
if (m_requestInfo.m_params.end() == moreDataTagValue) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_PutFileMissingMoreData);
badRequest(AT_LOC, "missing moredata");
return;
}
m_putFileInfo.m_moreData = ('1' == (*moreDataTagValue).second[0]);
tagValue_t::iterator createDirsTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_CREATE_DIRS));
if (m_requestInfo.m_params.end() == createDirsTagValue) {
m_putFileInfo.m_createDirs = false;
} else {
m_putFileInfo.m_createDirs = ('1' == (*createDirsTagValue).second[0]);
}
tagValue_t::iterator fileNameTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_NAME));
if (m_requestInfo.m_params.end() == fileNameTagValue) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_MissingFileName);
badRequest(AT_LOC, "missing name");
return;
}
std::string& fileName = (*fileNameTagValue).second;
// Defering the update the file name to telemetry as soon as parsed. If the request telemetry
// data has been pended from the last putfile(moredata=true), then overwriting the file name here
// and on failing with PutfileInProgress would end up uploading the data for the new file.
std::string ver;
if (!getVersion(ver)) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_MissingVer);
badRequest(AT_LOC, "missing ver");
return;
}
tagValue_t::iterator idTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_ID));
if (m_requestInfo.m_params.end() == idTagValue) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_MissingId);
badRequest(AT_LOC, "missing id");
return;
}
tagValue_t::iterator reqIdTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_REQ_ID));
if (m_requestInfo.m_params.end() == reqIdTagValue) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_MissingReqId);
badRequest(AT_LOC, "missing reqid");
return;
}
boost::uint32_t reqId = boost::lexical_cast<boost::uint32_t>((*reqIdTagValue).second);
if (reqId <= m_reqId) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_InvalidReqId);
badRequest(AT_LOC, "invalid request id");
return;
}
bool enableDiffResyncThrottle = true;
tagValue_t::iterator filetypeTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_FILETYPE));
if (m_requestInfo.m_params.end() == filetypeTagValue) {
//CXPS_LOG_WARNING("filetype not sent. Diff and resync throttle will not work");
enableDiffResyncThrottle = false;
}
else if(filetypeTagValue->second.empty())
{
//CXPS_LOG_WARNING("invalid filetype");
enableDiffResyncThrottle = false;
}
else
{
m_putFileInfo.m_filetype = (CxpsTelemetry::FileType)boost::lexical_cast<int>(filetypeTagValue->second);
//CXPS_LOG_ERROR("Filetype : " << m_putFileInfo.m_filetype);
}
tagValue_t::iterator diskidTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_DISKID));
if (m_requestInfo.m_params.end() == diskidTagValue) {
//CXPS_LOG_WARNING("diskid not sent. Diff and resync throttle will not work");
enableDiffResyncThrottle = false;
}
else if (diskidTagValue->second.empty())
{
//CXPS_LOG_WARNING("invalid diskid");
enableDiffResyncThrottle = false;
}
else
{
m_putFileInfo.m_deviceId = diskidTagValue->second;
boost::algorithm::to_lower(m_putFileInfo.m_deviceId);
//CXPS_LOG_ERROR("diskid : " << m_putFileInfo.m_deviceId);
}
if (!Authentication::verifyPutFileId(m_hostId,
m_serverOptions->password(),
HTTP_METHOD_POST,
HTTP_REQUEST_PUTFILE,
m_cnonce,
m_sessionId,
m_snonce,
fileName,
(m_putFileInfo.m_moreData ? '1' : '0'),
ver,
reqId,
(*idTagValue).second)) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_VerifPutFile);
std::string msg("invalid id for host: ");
msg += m_hostId;
msg += " request: ";
msg += HTTP_REQUEST_PUTFILE;
badRequest(AT_LOC, msg.c_str());
return;
}
m_reqId = reqId;
std::replace(fileName.begin(), fileName.end(), '\\', '/');
boost::filesystem::path fullPathName;
getFullPathName(fileName, fullPathName);
if (!m_putFileInfo.m_name.empty() && fullPathName != m_putFileInfo.m_name) {
std::stringstream errStr;
errStr << "putfile request for " << fullPathName
<< " but current putfile indicates " << m_putFileInfo.m_name << " is being processed";
if (m_putFileInfo.m_moreData) {
// if this throws, then it is most likely request error
if (putFileInProgress(errStr.str().c_str())) {
return;
}
}
m_requestTelemetryData.SetRequestFailure(RequestFailure_PutFileExpectedMoreData);
// VERIFY: if we get here, then it means there is no more data for the current
// putfile, but some how still have info about the last putfile. this should not happen
// so for now treat it like an error. we could just assume a reset was missed
// do the reset and continue with this request
throw ERROR_EXCEPTION << errStr.str() << " but more data is false";
}
m_requestTelemetryData.AcquiredFilePath(fileName);
// create the paths before checking throttle
if (!fullPathName.empty() && (m_putFileInfo.m_createDirs || m_serverOptions->createPaths())) {
CreatePaths::createPathsAsNeeded(fullPathName);
}
// Throttling should work only for diff and resync files, and not for other files
// To prevent other types of files from being throttled, we should only throttle files with extension .dat
// To do: sadewang-1912
// Change this logic to identify file types based on headers and not extension
if (!fullPathName.empty() && fullPathName.has_extension() && boost::iequals(fullPathName.extension().string(), ".dat"))
{
checkForThrottle(enableDiffResyncThrottle, fullPathName);
if (m_putFileInfo.m_isCumulativeThrottled || m_putFileInfo.m_isDiffThrottled || m_putFileInfo.m_isResyncThrottled)
{
setThrottleRequestFailureInTelemetry();
m_putFileInfo.m_bytesLeft = m_putFileInfo.m_totalBytesLeftToRead;
resetGuard.dismiss();
sessionLogoutGuard.dismiss();
if (m_putFileInfo.m_bytesLeft > 0)
{
ReadEntireDataFromSocketAsync();
}
else
{
ReadEntireDataAndSendThrottle();
}
return;
}
}
if (m_putFileInfo.m_fio.is_open()) {
m_putFileInfo.m_openFileIsNeeded = false;
} else {
m_putFileInfo.m_openFileIsNeeded = true;
m_putFileInfo.m_sentName = fileName;
{
// Acquire this mutex before changing m_putFileInfo.m_name
boost::mutex::scoped_lock guard(m_interSessionCommunicationMutex);
m_putFileInfo.m_name = fullPathName;
}
// CHECK: is the good enough?
extendedLengthPath_t extName(ExtendedLengthPath::name(m_putFileInfo.m_name.string()));
if (boost::filesystem::exists(extName)) {
std::string sessionId = g_sessionTracker->checkOpenFile(fullPathName, false, true);
if (!sessionId.empty()) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_FileOpenInSession);
m_putFileInfo.m_name.clear(); // this prevents this session from deleting the putfile while another session is using it
throw ERROR_EXCEPTION << "(sid: " << m_sessionId << ") putfile "
<< m_putFileInfo.m_name.string() << " currently opened by session "
<< sessionId << " can not be opened by multiple sessions";
}
CXPS_LOG_WARNING(AT_LOC << "(sid: " << m_sessionId << ") putfile " << m_putFileInfo.m_name.string()
<< " already exists. it will be overwritten");
}
try {
if (!m_putFileInfo.m_fio.open(extName.string().c_str(), FIO::FIO_OVERWRITE)) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_FileOpenFailed);
throw ERROR_EXCEPTION << "error opening file " << m_putFileInfo.m_name.string() << ": " << m_putFileInfo.m_fio.errorAsString();
}
} catch (std::exception const& e) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_FileOpenFailed);
throw ERROR_EXCEPTION << "error opening file " << m_putFileInfo.m_name.string() << ": " << e.what();
}
m_putFileInfo.m_compress = compressFile(m_putFileInfo.m_name);
if (m_putFileInfo.m_compress) {
m_putFileInfo.m_zFlate.reset(new Zflate(Zflate::COMPRESS));
}
}
if (FIO::FIO_SUCCESS != m_putFileInfo.m_fio.error()) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_FileBadState);
throw ERROR_EXCEPTION << "file is open but in a bad state " << m_putFileInfo.m_name.string() << ": " << m_putFileInfo.m_fio.errorAsString();
}
m_putFileInfo.m_bytesLeft = m_putFileInfo.m_totalBytesLeftToRead;
tagValue_t::iterator offsetTagValue(m_requestInfo.m_params.find(HTTP_PARAM_TAG_OFFSET));
if (m_requestInfo.m_params.end() != offsetTagValue) {
try {
FIO::offset_t offset = boost::lexical_cast<FIO::offset_t>((*offsetTagValue).second);
m_putFileInfo.m_fio.seek(offset, SEEK_SET);
} catch (std::exception const& e) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_FileInvalidOffset);
throw ERROR_EXCEPTION << "invalid offset " << (*offsetTagValue).second << ": " << e.what();
}
}
// need to write any remaining bytes in the buffer
// to the file before starting the async handling
if (m_putFileInfo.m_idx < m_requestInfo.m_bufferLen) {
if (writePutFileData(&m_putFileInfo.m_buffer[m_putFileInfo.m_idx], m_requestInfo.m_bufferLen - m_putFileInfo.m_idx) < 0) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_FileWriteFailed);
throw ERROR_EXCEPTION << m_putFileInfo.m_name.string() << " write file error: " << m_putFileInfo.m_fio.errorAsString();
}
}
logRequestProcessing();
if (m_putFileInfo.m_totalBytesLeftToRead > 0) {
resetGuard.dismiss();
sessionLogoutGuard.dismiss();
asyncPutFile();
} else {
// have all the data no need for async processing just end
char buffer[1]; // needed for compression
if (writePutFileData(buffer, 0) < 0) {
m_requestTelemetryData.SetRequestFailure(RequestFailure_FileWriteFailed);
throw ERROR_EXCEPTION << m_putFileInfo.m_name.string() << " write file error: " << m_putFileInfo.m_fio.errorAsString();
}
if (!m_putFileInfo.m_fio.is_open() || 0 == m_putFileInfo.m_fio.tell()) {
CXPS_LOG_WARNING(AT_LOC << "0 size file detected: " << m_putFileInfo.m_name.string());
}
resetGuard.dismiss();
sessionLogoutGuard.dismiss();
putFileEnd();
}
}