adapter.cpp (415 lines of code) (raw):

#include "adapter.h" #include <sstream> #include <iomanip> #define NOMINMAX #define WIN32_LEAN_AND_MEAN #include <curl/curl.h> using namespace std; namespace aliyun_log_sdk_v6 { extern const char * const DATE_FORMAT_RFC822 = "%a, %d %b %Y %H:%M:%S GMT"; ///<RFC822 date formate, GMT time. static const uint32_t MD5_BYTES=16; extern const char* const LOG_HEADSIGNATURE_PREFIX = "LOG "; extern const char* const LOGE_REQUEST_ERROR = "RequestError"; extern const char* const LOGE_INVALID_HOST = "InvalidHost"; extern const char* const LOGE_UNKNOWN_ERROR = "UnknownError"; extern const char* const LOGE_NOT_IMPLEMENTED = "NotImplemented"; extern const char* const LOGE_SERVER_BUSY = "ServerBusy"; extern const char* const LOGE_INTERNAL_SERVER_ERROR = "InternalServerError"; extern const char* const LOGE_RESPONSE_SIG_ERROR = "ResponseSignatureError"; extern const char* const LOGE_PARAMETER_INVALID = "ParameterInvalid"; extern const char* const LOGE_MISSING_PARAMETER = "MissingParameter"; extern const char* const LOGE_INVALID_METHOD = "InvalidMethod"; extern const char* const LOGE_BAD_RESPONSE = "BadResponse"; extern const char* const LOGE_UNAUTHORIZED = "Unauthorized"; extern const char* const LOGE_QUOTA_EXCEED = "ExceedQuota"; extern const char* const LOGE_REQUEST_TIMEOUT = "RequestTimeout"; extern const char* const LOGE_CLIENT_OPERATION_TIMEOUT = "ClientOpertaionTimeout"; extern const char* const LOGE_CLIENT_NETWORK_ERROR = "ClientNetworkError"; extern const char* const LOGE_USER_NOT_EXIST = "UserNotExist"; extern const char* const LOGE_CATEGORY_NOT_EXIST = "CategoryNotExist"; extern const char* const LOGE_TOPIC_NOT_EXIST = "TopicNotExist"; extern const char* const LOGE_POST_BODY_INVALID = "PostBodyInvalid"; extern const char* const LOGE_INVALID_CONTENTTYPE = "InvalidContentType"; extern const char* const LOGE_INVALID_CONTENLENGTH = "InvalidContentLength"; extern const char* const LOGE_INVALID_APIVERSION = "InvalidAPIVersion"; extern const char* const LOGE_PROJECT_NOT_EXIST = "ProjectNotExist"; extern const char* const LOGE_LOGSTORE_NOT_EXIST = "LogStoreNotExist"; extern const char* const LOGE_INVALID_ACCESSKEYID = "InvalidAccessKeyId"; extern const char* const LOGE_SIGNATURE_NOT_MATCH = "SignatureNotMatch"; extern const char* const LOGE_PROJECT_FORBIDDEN = "ProjectForbidden"; extern const char* const LOGE_WRITE_QUOTA_EXCEED = "WriteQuotaExceed"; extern const char* const LOGE_READ_QUOTA_EXCEED = "ReadQuotaExceed"; extern const char* const LOGE_REQUEST_TIME_EXPIRED = "RequestTimeExpired"; extern const char* const LOGE_INVALID_REQUEST_TIME = "InvalidRequestTime"; extern const char* const LOGE_POST_BODY_TOO_LARGE = "PostBodyTooLarge"; extern const char* const LOGE_INVALID_TIME_RANGE = "InvalidTimeRange"; extern const char* const LOGE_INVALID_REVERSE = "InvalidReverse"; extern const char* const LOGE_LOGSTORE_WITHOUT_SHARD = "LogStoreWithoutShard"; extern const char* const LOGE_SHARD_WRITE_QUOTA_EXCEED = "ShardWriteQuotaExceed"; extern const char* const LOGE_SHARD_READ_QUOTA_EXCEED = "ShardReadQuotaExceed"; extern const char* const LOGSTORES = "/logstores"; extern const char* const SHARDS = "/shards"; extern const char* const INDEX = "/index"; extern const char* const CONFIGS = "/configs"; extern const char* const MACHINES = "/machines"; extern const char* const MACHINEGROUPS = "/machinegroups"; extern const char* const ACLS = "/acls"; extern const char* const HTTP_GET = "GET"; extern const char* const HTTP_POST = "POST"; extern const char* const HTTP_PUT = "PUT"; extern const char* const HTTP_DELETE = "DELETE"; extern const char* const HOST = "Host"; extern const char* const DATE = "Date"; extern const char* const USER_AGENT = "User-Agent"; extern const char* const LOG_HEADER_PREFIX = "x-log-"; extern const char* const ACS_HEADER_PREFIX = "x-acs-"; extern const char* const X_LOG_APIVERSION = "x-log-apiversion"; extern const char* const X_LOG_COMPRESSTYPE = "x-log-compresstype"; extern const char* const X_LOG_BODYRAWSIZE = "x-log-bodyrawsize"; extern const char* const X_LOG_SIGNATUREMETHOD = "x-log-signaturemethod"; extern const char* const X_ACS_SECURITY_TOKEN = "x-acs-security-token"; extern const char* const X_LOG_CURSOR = "x-log-cursor"; extern const char* const X_LOG_REQUEST_ID = "x-log-requestid"; extern const char* const X_LOG_PROGRESS = "x-log-progress"; extern const char* const X_LOG_COUNT = "x-log-count"; extern const char *const X_LOG_PROCESSED_ROWS = "x-log-processoed-rows"; extern const char *const X_LOG_ELASPED_MILLISECOND = "x-log-elapsed-millisecond"; extern const char *const X_LOG_CPU_SEC = "x-log-cpu-sec"; extern const char *const X_LOG_CPU_CORES = "x-log-cpu-cores"; extern const char* const HTTP_ACCEPT = "accept"; extern const char* const DEFLATE = "deflate"; extern const char* const HMAC_SHA1 = "hmac-sha1"; extern const char* const CONTENT_TYPE = "Content-Type"; extern const char* const CONTENT_LENGTH = "Content-Length"; extern const char* const CONTENT_MD5 = "Content-MD5"; extern const char* const AUTHORIZATION = "Authorization"; extern const char* const SIGNATURE = "Signature"; extern const char* const ACCEPT_ENCODING = "Accept-Encoding"; extern const char* const ENCONDING_GZIP = "gzip"; extern const char* const TYPE_LOG_PROTOBUF ="application/x-protobuf"; extern const char* const TYPE_LOG_JSON ="application/json"; extern const char* const LOGITEM_TIME_STAMP_LABEL="__time__"; extern const char* const LOGITEM_SOURCE_LABEL="__source__"; extern const char* const LOG_API_VERSION = "0.6.0"; extern const char* const LOG_TYPE_CURSOR = "cursor"; extern const char* const LOG_TYPE = "type"; extern const char* const LOGE_NOT_SUPPORTED_ACCEPT_CONTENT_TYPE = "InvalidAcceptContentType"; extern const char* const LOGE_NOT_SUPPORTED_ACCEPT_ENCODING = "InvalidAcceptEncoding"; extern const char* const LOGE_SHARD_NOT_EXIST = "ShardNotExist"; extern const char* const LOGE_INVALID_CURSOR = "InvalidCursor"; extern const char* const LOG_LZ4 = "lz4"; extern const char* const LOG_ERROR_CODE = "errorCode"; extern const char* const LOG_ERROR_MESSAGE = "errorMessage"; extern const char* const LOG_SHARD_STATUS_READWRITE= "readwrite"; extern const char* const LOG_SHARD_STATUS_READONLY = "readonly"; class BodyTransfer { public: explicit BodyTransfer(const std::string& data) : data(data) {} size_t read(char* ptr, size_t wanted) { size_t remains = std::max((size_t)0, data.size() - transfered); if (remains == 0) { return 0; } size_t readBytes = std::min(remains, wanted); std::memcpy(ptr, data.c_str() + transfered, readBytes); transfered += readBytes; return readBytes; } private: size_t transfered = 0; const std::string& data; }; static size_t sendBody(char *ptr, size_t size, size_t nmemb, void *userdata) { BodyTransfer *state = static_cast<BodyTransfer *>(userdata); const size_t wanted = size * nmemb; return state->read(ptr, wanted); } static std::string HexToString(const uint8_t md5[16]) { static const char* table = "0123456789ABCDEF"; std::string ss(32, 'a'); for (int i = 0; i < 16 ; ++i) { ss[i * 2] = table[md5[i] >> 4]; ss[i * 2 + 1] = table[md5[i] & 0x0F]; } return ss; } std::string CodecTool::CalcMD5(const std::string& message) { uint8_t md5[MD5_BYTES]; DoMd5((const uint8_t*)message.data(), message.length(), md5); return HexToString(md5); } std::string CodecTool::CalcSHA1(const std::string& message, const std::string& key) { HMAC hmac(reinterpret_cast<const uint8_t*>(key.data()), key.size()); hmac.add(reinterpret_cast<const uint8_t*>(message.data()), message.size()); return string(reinterpret_cast<const char*>(hmac.result()),SHA1_DIGEST_BYTES); } std::string CodecTool::Base64Enconde(const std::string& message) { std::istringstream iss(message); std::ostringstream oss; Base64Encoding(iss, oss); return oss.str(); } std::string CodecTool::ToGmtTime(std::time_t &t, const std::string& format) { std::stringstream date; std::tm tm; #ifdef _MSC_VER ::gmtime_s(&tm, &t); #else ::gmtime_r(&t, &tm); #endif #if defined(__GNUG__) && __GNUC__ < 5 static const char wday_name[][4] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" }; static const char mon_name[][4] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; char tmbuff[26]; snprintf(tmbuff, sizeof(tmbuff), "%.3s, %.2d %.3s %d %.2d:%.2d:%.2d", wday_name[tm.tm_wday], tm.tm_mday, mon_name[tm.tm_mon], 1900 + tm.tm_year, tm.tm_hour, tm.tm_min, tm.tm_sec); date << tmbuff << " GMT"; #else date.imbue(std::locale::classic()); date << std::put_time(&tm, format.c_str()); #endif return date.str(); } std::string CodecTool::GetDateString(const std::string& dateFormat) { std::time_t t = std::time(nullptr); return ToGmtTime(t, dateFormat); } std::string CodecTool::GetDateString() { return GetDateString(DATE_FORMAT_RFC822); } bool CodecTool::StartWith(const std::string& input, const std::string& pattern) { if (input.length() < pattern.length()) { return false; } size_t i = 0; while (i < pattern.length() && input[i] == pattern[i]) { i++; } return i == pattern.length(); } string CodecTool::UrlEncode(const string& url) { char *szEncoded; szEncoded = curl_escape(url.c_str(), url.size()); string tmpStr = szEncoded; curl_free(szEncoded); return tmpStr; } void LOGAdapter::GetQueryString(const map<string, string>& parameterList, string &queryString) { queryString.clear(); for(map<string, string>::const_iterator iter=parameterList.begin(); iter!=parameterList.end(); ++iter) { if (iter != parameterList.begin()) { queryString.append("&"); } queryString.append(iter->first); queryString.append("="); queryString.append(CodecTool::UrlEncode(iter->second)); } } static size_t data_write_callback(char* buffer, size_t size, size_t nmemb, string* write_buf) { size_t sizes = size * nmemb; if (buffer == NULL) { return 0; } write_buf->append(buffer, sizes); return sizes; } static size_t header_write_callback(char* buffer, size_t size, size_t nmemb, map<string,string>* write_buf) { size_t sizes = size * nmemb; if (buffer == NULL) { return 0; } size_t colonIndex; for(colonIndex=1; colonIndex<sizes-2; colonIndex++) { if(buffer[colonIndex] == ':')break; } if(colonIndex<sizes-2) { unsigned long leftSpaceNum, rightSpaceNum; for(leftSpaceNum=0; leftSpaceNum<colonIndex-1; leftSpaceNum++) { if(buffer[colonIndex-leftSpaceNum-1] != ' ') break; } for(rightSpaceNum=0; rightSpaceNum<sizes-colonIndex-1-2; rightSpaceNum++) { if(buffer[colonIndex+rightSpaceNum+1] != ' ') break; } (*write_buf)[string(buffer,0,colonIndex-leftSpaceNum)] = string(buffer, colonIndex+1+rightSpaceNum, sizes-colonIndex-1-2-rightSpaceNum); } return sizes; } void LOGAdapter::Send(const string& httpMethod, const string& host, const int32_t port, const string& url, const string& queryString, const map<string, string>& header, const string& body, const int32_t timeout, HttpMessage& httpMessage, const int64_t maxspeed) { /* for(map<string, string>::const_iterator iter = header.begin();iter != header.end();iter++) { } cout << endl; cout << "HOST:" << host << "\n" << endl; //cout << "URI:" << url << "\n" << endl; //cout << "BODY:" << body << "\n" << endl; //cout << "QUERYSTRING:" << queryString << "\n" << endl; */ CURLcode res; std::string response; int64_t statusCode; map<std::string,std::string> responseHeader; struct curl_slist *headers = NULL; for(map<std::string, std::string>::const_iterator iter=header.begin(); iter!=header.end(); iter++) { headers = curl_slist_append(headers, (iter->first + ":" + iter->second).c_str()); } string queryUrl = host + url; if(queryString.empty() == false) queryUrl += "?"+queryString; CURL* curl = curl_easy_init(); if (curl) { if(headers) { curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); } curl_easy_setopt(curl, CURLOPT_URL, queryUrl.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, data_write_callback); curl_easy_setopt(curl, CURLOPT_WRITEHEADER, &responseHeader); curl_easy_setopt(curl, CURLOPT_TCP_NODELAY, 1); curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_write_callback); curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout); curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT); curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1); curl_easy_setopt(curl, CURLOPT_MAX_SEND_SPEED_LARGE, (curl_off_t)maxspeed); if (httpMethod == HTTP_POST) { curl_easy_setopt(curl, CURLOPT_POST, 1L); BodyTransfer transfer(body); curl_easy_setopt(curl, CURLOPT_READDATA, &transfer); curl_easy_setopt(curl, CURLOPT_READFUNCTION, sendBody); } else if (httpMethod == HTTP_DELETE) { curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, HTTP_DELETE); } else if (httpMethod == HTTP_PUT) { curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L); BodyTransfer transfer(body); curl_easy_setopt(curl, CURLOPT_READDATA, &transfer); curl_easy_setopt(curl, CURLOPT_READFUNCTION, sendBody); } res = curl_easy_perform(curl); #define CURL_CLEAN_UP curl_easy_cleanup(curl);\ if(headers)\ {\ curl_slist_free_all(headers);\ } switch(res) { case CURLE_OK: curl_easy_getinfo (curl, CURLINFO_RESPONSE_CODE, &statusCode); CURL_CLEAN_UP break; case CURLE_OPERATION_TIMEDOUT: CURL_CLEAN_UP throw LOGException(LOGE_REQUEST_TIMEOUT, "Request operation timeout."); break; case CURLE_COULDNT_CONNECT: CURL_CLEAN_UP throw LOGException(LOGE_CLIENT_NETWORK_ERROR, "Can not connect to server."); break; default: CURL_CLEAN_UP throw LOGException(LOGE_CLIENT_NETWORK_ERROR, std::string("Request operation failed.")+"CURL_ERROR_CODE:"+ToString(res)); break; } } else { throw LOGException(LOGE_UNKNOWN_ERROR, "Initailizing request failed."); } httpMessage = HttpMessage(statusCode,responseHeader, response); } string LOGAdapter::GetUrlSignature(const string& httpMethod, const string& operationType, map<string, string>& httpHeader, const map<string, string>& parameterList, const string& content, const string& signKey, const LOGSigType sigType) { string contentMd5; string signature; string osstream; if(! content.empty()) { contentMd5=CodecTool::CalcMD5(content); } string contentType; map<string, string>::iterator iter = httpHeader.find(CONTENT_TYPE); if (iter != httpHeader.end()) { contentType = iter->second; } std::map<string,string> endingMap; switch(sigType) { case BASE64_SHA1_MD5: osstream.append(httpMethod); osstream.append("\n"); osstream.append(contentMd5); osstream.append("\n"); osstream.append(contentType); osstream.append("\n"); osstream.append(httpHeader[DATE]); osstream.append("\n"); for(map<string,string>::const_iterator iter=httpHeader.begin(); iter!=httpHeader.end(); ++iter) { if(CodecTool::StartWith(iter->first, LOG_HEADER_PREFIX) || CodecTool::StartWith(iter->first, ACS_HEADER_PREFIX)) { endingMap.insert(std::make_pair(iter->first, iter->second)); } } for(map<string,string>::const_iterator it=endingMap.begin(); it!=endingMap.end(); ++it) { osstream.append(it->first); osstream.append(":"); osstream.append(it->second); osstream.append("\n"); } osstream.append(operationType); if (parameterList.size() > 0) osstream.append("?"); for(map<string,string>::const_iterator iter=parameterList.begin(); iter!=parameterList.end(); ++iter) { if(iter != parameterList.begin()) { osstream.append("&"); } osstream.append(iter->first); osstream.append("="); osstream.append(iter->second); } signature=CodecTool::Base64Enconde(CodecTool::CalcSHA1(osstream, signKey)); break; default: throw LOGException(LOGE_NOT_IMPLEMENTED, "Signature Version does not support."); } return signature; } }