WdtTransferRequest.cpp (448 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #include <wdt/WdtTransferRequest.h> #include <folly/Conv.h> #include <folly/Range.h> #include <ctime> using namespace std; using folly::StringPiece; namespace facebook { namespace wdt { WdtUri::WdtUri(const string& url) { errorCode_ = process(url); } void WdtUri::setHostName(const string& hostName) { hostName_ = hostName; } void WdtUri::setPort(int32_t port) { port_ = port; } void WdtUri::setQueryParam(const string& key, const string& value) { queryParams_[key] = value; } ErrorCode WdtUri::getErrorCode() const { return errorCode_; } /* static */ char WdtUri::toHex(unsigned char v) { WDT_CHECK_LT(v, 16); if (v <= 9) { return '0' + v; } return 'a' + v - 10; } /* static */ int WdtUri::fromHex(char c) { if (c < '0' || (c > '9' && (c < 'a' || c > 'f'))) { return -1; // invalid not 0-9a-f hex char } if (c <= '9') { return c - '0'; } return c - 'a' + 10; } /* static */ string WdtUri::escape(const string& binaryStr) { string res; res.reserve(binaryStr.length()); // most time nothing to escape for (unsigned char c : binaryStr) { // Allow 0-9 A-Z a-z (alphanum) and , : . _ + - (note that : is arguable) // (and we could use an array lookup instead of a bunch of ||s) if (c == ',' || c == ':' || c == '.' || c == '_' || c == '+' || c == '-' || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9')) { res.push_back(c); } else { res.push_back('%'); res.push_back(toHex(c >> 4)); res.push_back(toHex(c & 0xf)); } } return res; } /* static */ bool WdtUri::unescape(string& res, StringPiece escapedValue) { res.reserve(res.length() + escapedValue.size()); for (size_t i = 0; i < escapedValue.size(); ++i) { char c = escapedValue[i]; if (c != '%') { res.push_back(c); continue; } i += 2; // Make sure there room for both hex if (i >= escapedValue.size()) { WLOG(ERROR) << "Can't decode \"" << escapedValue << "\" end with unfinished % sequence"; return false; } const int msb = fromHex(escapedValue[i - 1]); const int lsb = fromHex(escapedValue[i]); if (msb < 0 || lsb < 0) { WLOG(ERROR) << "Can't decode \"" << escapedValue << "\" % sequence with non 0-9a-f characters"; return false; } res.push_back(msb << 4 | lsb); } return true; } string WdtUri::generateUrl() const { string url = WDT_URL_PREFIX; if (hostName_.find(':') != string::npos) { // Enclosing ipv6 address by [] so that it can be escaped folly::toAppend('[', hostName_, ']', &url); } else { folly::toAppend(hostName_, &url); } if (port_ > 0) { // Must have positive port value folly::toAppend(":", port_, &url); } char prefix = '?'; for (const auto& pair : queryParams_) { if (!pair.second.empty()) { string value = WdtUri::escape(pair.second); folly::toAppend(prefix, pair.first, "=", value, &url); prefix = '&'; } } return url; } ErrorCode WdtUri::process(const string& url) { if (url.size() < WDT_URL_PREFIX.size()) { WLOG(ERROR) << "Url doesn't specify wdt protocol"; return URI_PARSE_ERROR; } StringPiece urlPiece(url, 0, WDT_URL_PREFIX.size()); StringPiece wdtPrefix(WDT_URL_PREFIX); if (urlPiece != wdtPrefix) { WLOG(ERROR) << "Url does not specify wdt protocol " << url; return URI_PARSE_ERROR; } urlPiece = StringPiece(url, WDT_URL_PREFIX.size()); if (urlPiece.empty()) { WLOG(ERROR) << "Empty host name " << url; return URI_PARSE_ERROR; } ErrorCode status = OK; // Parse hostname if (urlPiece[0] == '[') { urlPiece.advance(1); size_t hostNameEnd = urlPiece.find(']'); if (hostNameEnd == string::npos) { WLOG(ERROR) << "Didn't find ] for ipv6 address " << url; return URI_PARSE_ERROR; } hostName_.assign(urlPiece.data(), 0, hostNameEnd); urlPiece.advance(hostNameEnd + 1); } else { size_t urlIndex = 0; for (; urlIndex < urlPiece.size(); ++urlIndex) { if (urlPiece[urlIndex] == ':') { break; } if (urlPiece[urlIndex] == '?') { break; } } hostName_.assign(urlPiece.data(), 0, urlIndex); urlPiece.advance(urlIndex); } if (hostName_.empty()) { status = URI_PARSE_ERROR; WLOG(ERROR) << "Empty hostname " << url; } if (urlPiece.empty()) { return status; } // TODO: allow for '/' like wdt://[::1]:1234/?num_ports=3 // parse port number if (urlPiece[0] == ':') { urlPiece.advance(1); size_t paramsIndex = urlPiece.find('?'); if (paramsIndex == string::npos) { paramsIndex = urlPiece.size(); } try { string portStr; portStr.assign(urlPiece.data(), 0, paramsIndex); port_ = folly::to<int32_t>(portStr); } catch (std::exception& e) { WLOG(ERROR) << "Invalid port, can't be parsed " << url; status = URI_PARSE_ERROR; } urlPiece.advance(paramsIndex); } if (urlPiece.empty()) { return status; } if (urlPiece[0] != '?') { WLOG(ERROR) << "Unexpected delimiter for params " << urlPiece[0]; return URI_PARSE_ERROR; } urlPiece.advance(1); // parse params while (!urlPiece.empty()) { StringPiece keyValuePair = urlPiece.split_step('&'); if (keyValuePair.empty()) { // Last key value pair keyValuePair = urlPiece; urlPiece.advance(urlPiece.size()); } StringPiece key = keyValuePair.split_step('='); StringPiece value = keyValuePair; if (key.empty()) { // Value can be empty but key can't be empty WLOG(ERROR) << "Errors parsing params, url = " << url; status = URI_PARSE_ERROR; break; } string unescapedValue; if (!unescape(unescapedValue, value)) { status = URI_PARSE_ERROR; } queryParams_[key.toString()] = unescapedValue; } return status; } string WdtUri::getHostName() const { return hostName_; } int32_t WdtUri::getPort() const { return port_; } string WdtUri::getQueryParam(const string& key) const { auto it = queryParams_.find(key); if (it == queryParams_.end()) { WVLOG(1) << "Couldn't find query param " << key; return ""; } return it->second; } const map<string, string>& WdtUri::getQueryParams() const { return queryParams_; } void WdtUri::clear() { hostName_.clear(); port_ = -1; queryParams_.clear(); } WdtUri& WdtUri::operator=(const string& url) { clear(); errorCode_ = process(url); return *this; } /* static */ std::vector<int32_t> WdtTransferRequest::genPortsVector(int32_t startPort, int32_t numPorts) { std::vector<int32_t> ports; for (int32_t i = 0; i < numPorts; i++) { ports.push_back(startPort + i); } return ports; } const string WdtTransferRequest::TRANSFER_ID_PARAM{"id"}; /** RECeiver's Protocol Version */ const string WdtTransferRequest::RECEIVER_PROTOCOL_VERSION_PARAM{"recpv"}; const string WdtTransferRequest::DIRECTORY_PARAM{"dir"}; const string WdtTransferRequest::PORTS_PARAM{"ports"}; const string WdtTransferRequest::START_PORT_PARAM{"start_port"}; const string WdtTransferRequest::NUM_PORTS_PARAM{"num_ports"}; const string WdtTransferRequest::ENCRYPTION_PARAM{"Enc"}; const string WdtTransferRequest::TLS_PARAM{"tls"}; const string WdtTransferRequest::NAMESPACE_PARAM{"ns"}; const string WdtTransferRequest::DEST_IDENTIFIER_PARAM{"dstid"}; const string WdtTransferRequest::DOWNLOAD_RESUMPTION_PARAM{"dr"}; const string WdtTransferRequest::IV_CHANGE_INTERVAL_PARAM{"iv_change_int"}; WdtTransferRequest::WdtTransferRequest(int startPort, int numPorts, const string& directory) { this->directory = directory; int portNum = startPort; for (int i = 0; i < numPorts; i++) { ports.push_back(portNum); if (startPort) { ++portNum; } } } WdtTransferRequest::WdtTransferRequest(const string& uriString) { WdtUri wdtUri(uriString); errorCode = wdtUri.getErrorCode(); hostName = wdtUri.getHostName(); transferId = wdtUri.getQueryParam(TRANSFER_ID_PARAM); wdtNamespace = wdtUri.getQueryParam(NAMESPACE_PARAM); destIdentifier = wdtUri.getQueryParam(DEST_IDENTIFIER_PARAM); directory = wdtUri.getQueryParam(DIRECTORY_PARAM); string encStr = wdtUri.getQueryParam(ENCRYPTION_PARAM); if (!encStr.empty()) { ErrorCode code = EncryptionParams::unserialize(encStr, encryptionData); if (code != OK) { WLOG(ERROR) << "Unable to parse encryption data from \"" << encStr << "\" " << errorCodeToStr(code); errorCode = getMoreInterestingError(code, errorCode); } } string tlsEnabled = wdtUri.getQueryParam(TLS_PARAM); try { if (!tlsEnabled.empty()) { tls = folly::to<bool>(tlsEnabled); } } catch (std::exception& e) { WLOG(ERROR) << "Error parsing tls " << tlsEnabled << " " << e.what(); errorCode = URI_PARSE_ERROR; } string downloadResume = wdtUri.getQueryParam(DOWNLOAD_RESUMPTION_PARAM); try { if (!downloadResume.empty()) { downloadResumptionEnabled = folly::to<bool>(downloadResume); } } catch (std::exception& e) { WLOG(ERROR) << "Error parsing download resume " << downloadResume << " " << e.what(); errorCode = URI_PARSE_ERROR; } const string recpv = wdtUri.getQueryParam(RECEIVER_PROTOCOL_VERSION_PARAM); if (recpv.empty()) { WLOG(WARNING) << RECEIVER_PROTOCOL_VERSION_PARAM << " not specified in URI"; } else { try { protocolVersion = folly::to<int64_t>(recpv); } catch (std::exception& e) { WLOG(ERROR) << "Error parsing protocol version " << wdtUri.getQueryParam(RECEIVER_PROTOCOL_VERSION_PARAM) << " " << e.what(); errorCode = URI_PARSE_ERROR; } } const string ivChangeIntervalStr = wdtUri.getQueryParam(IV_CHANGE_INTERVAL_PARAM); if (ivChangeIntervalStr.empty()) { WLOG(WARNING) << IV_CHANGE_INTERVAL_PARAM << " not specified in URI"; } else { try { ivChangeInterval = folly::to<int64_t>(ivChangeIntervalStr); } catch (std::exception& e) { WLOG(ERROR) << "Error parsing iv change interval " << ivChangeIntervalStr << " " << e.what(); errorCode = URI_PARSE_ERROR; } } string portsStr(wdtUri.getQueryParam(PORTS_PARAM)); StringPiece portsList(portsStr); // pointers into portsStr do { StringPiece portNum = portsList.split_step(','); int port; if (!portNum.empty()) { try { port = folly::to<int32_t>(portNum); ports.push_back(port); } catch (std::exception& e) { WLOG(ERROR) << "Couldn't convert " << portNum << " to valid port number"; errorCode = URI_PARSE_ERROR; } } } while (!portsList.empty()); if (!ports.empty()) { // Done with ports - rest of the function is alternative way to set ports return; } // Figure out ports using other params only if there was no port list string startPortStr = wdtUri.getQueryParam(START_PORT_PARAM); string numPortsStr = wdtUri.getQueryParam(NUM_PORTS_PARAM); int32_t startPort = wdtUri.getPort(); if (startPort <= 0) { if (startPortStr.empty()) { WLOG(ERROR) << "URI should have port or " << START_PORT_PARAM; errorCode = INVALID_REQUEST; } else { try { startPort = folly::to<int32_t>(startPortStr); } catch (std::exception& e) { WLOG(ERROR) << "Couldn't convert start port " << startPortStr; errorCode = URI_PARSE_ERROR; } } } int32_t numPorts = 0; if (numPortsStr.empty()) { WLOG(ERROR) << "URI should have " << NUM_PORTS_PARAM; errorCode = INVALID_REQUEST; } else { try { numPorts = folly::to<int32_t>(numPortsStr); } catch (std::exception& e) { WLOG(ERROR) << "Couldn't convert num ports " << numPortsStr; errorCode = URI_PARSE_ERROR; } } if (errorCode == OK) { ports = WdtTransferRequest::genPortsVector(startPort, numPorts); } // beware of the return above, add future params processing above the return } string WdtTransferRequest::genWdtUrlWithSecret() const { return generateUrlInternal(false, false); } string WdtTransferRequest::getLogSafeString() const { return generateUrlInternal(true, true); } string WdtTransferRequest::generateUrlInternal(bool genFull, bool forLogging) const { if (errorCode != OK) { const string msg = errorCodeToStr(errorCode); WLOG(ERROR) << "Transfer request has " << msg; return msg; } WdtUri wdtUri; wdtUri.setHostName(hostName); wdtUri.setQueryParam(TRANSFER_ID_PARAM, transferId); wdtUri.setQueryParam(NAMESPACE_PARAM, wdtNamespace); wdtUri.setQueryParam(DEST_IDENTIFIER_PARAM, destIdentifier); wdtUri.setQueryParam(RECEIVER_PROTOCOL_VERSION_PARAM, folly::to<string>(protocolVersion)); wdtUri.setQueryParam(IV_CHANGE_INTERVAL_PARAM, folly::to<string>(ivChangeInterval)); if (downloadResumptionEnabled) { wdtUri.setQueryParam(DOWNLOAD_RESUMPTION_PARAM, folly::to<string>(downloadResumptionEnabled)); } serializePorts(wdtUri); if (genFull) { wdtUri.setQueryParam(DIRECTORY_PARAM, directory); } if (encryptionData.isSet()) { WVLOG(1) << "Encryption data is set " << encryptionData.getLogSafeString(); wdtUri.setQueryParam(ENCRYPTION_PARAM, forLogging ? encryptionData.getLogSafeString() : encryptionData.getUrlSafeString()); } wdtUri.setQueryParam(TLS_PARAM, folly::to<string>(tls)); return wdtUri.generateUrl(); } void WdtTransferRequest::serializePorts(WdtUri& wdtUri) const { // Serialize to a port list if the ports are not // contigous sequence else wdt://hostname:port if (ports.size() == 0) { return; } int32_t prevPort = ports[0]; bool hasHoles = false; for (size_t i = 1; i < ports.size(); i++) { if (ports[i] != prevPort + 1) { hasHoles = true; break; } prevPort = ports[i]; } if (hasHoles) { wdtUri.setQueryParam(PORTS_PARAM, getSerializedPortsList()); } else { wdtUri.setPort(ports[0]); wdtUri.setQueryParam(NUM_PORTS_PARAM, folly::to<string>(ports.size())); } } string WdtTransferRequest::getSerializedPortsList() const { string portsList = ""; for (size_t i = 0; i < ports.size(); i++) { if (i != 0) { folly::toAppend(",", &portsList); } auto port = ports[i]; folly::toAppend(port, &portsList); } return portsList; } bool WdtTransferRequest::operator==(const WdtTransferRequest& that) const { bool result = (transferId == that.transferId) && (protocolVersion == that.protocolVersion) && (directory == that.directory) && (hostName == that.hostName) && (ports == that.ports) && (encryptionData == that.encryptionData) && (destIdentifier == that.destIdentifier) && (wdtNamespace == that.wdtNamespace); // No need to check the file info, simply checking whether two objects // are same with respect to the wdt settings return result; } std::ostream& operator<<(std::ostream& os, const WdtTransferRequest& req) { // getLogSafeString() returns a url string containing all fields. So should be // enough for logging purpose. os << "WdtTransferRequest represented by url: " << req.getLogSafeString(); return os; } } }