client.cpp (1,673 lines of code) (raw):

#include "client.h" #include "adapter.h" #include "common.h" #include "rapidjson/document.h" #include "rapidjson/writer.h" #include "rapidjson/stringbuffer.h" #include <cstdio> #include <cstdlib> #include <string> #include "util.h" using namespace std; using namespace aliyun_log_sdk_v6::pb; using namespace rapidjson; extern const char* const aliyun_log_sdk_v6::LOG_SDK_IDENTIFICATION = "sls-cpp-sdk v0.6.2"; namespace aliyun_log_sdk_v6 { /************************ common method ***********************/ /************************ json method *************************/ void ExtractJsonResult(const string& response, rapidjson::Document& document) { document.Parse(response.c_str()); if (document.HasParseError()) { throw JsonException("ParseException", "Fail to parse from json string"); } } void JsonMemberCheck(const rapidjson::Value& value, const char* name) { if (!value.IsObject()) { throw JsonException("InvalidObjectException", "response is not valid JSON object"); } if (!value.HasMember(name)) { throw JsonException("NoMemberException", string("Member ") + name + " does not exist"); } } void ExtractJsonResult(const rapidjson::Value& value, const char* name, int32_t& number) { JsonMemberCheck(value, name); if (value[name].IsInt()) { number = value[name].GetInt(); } else { throw JsonException("ValueTypeException", string("Member ") + name + " is not int type"); } } void ExtractJsonResult(const rapidjson::Value& value, const char* name, uint32_t& number) { JsonMemberCheck(value, name); if (value[name].IsUint()) { number = value[name].GetUint(); } else { throw JsonException("ValueTypeException", string("Member ") + name + " is not uint type"); } } void ExtractJsonResult(const rapidjson::Value& value, const char* name, int64_t& number) { JsonMemberCheck(value, name); if (value[name].IsInt64()) { number = value[name].GetInt64(); } else { throw JsonException("ValueTypeException", string("Member ") + name + " is not int type"); } } void ExtractJsonResult(const rapidjson::Value& value, const char* name, uint64_t& number) { JsonMemberCheck(value, name); if (value[name].IsUint64()) { number = value[name].GetUint64(); } else { throw JsonException("ValueTypeException", string("Member ") + name + " is not uint type"); } } void ExtractJsonResult(const rapidjson::Value& value, const char* name, bool& boolean) { JsonMemberCheck(value, name); if (value[name].IsBool()) { boolean = value[name].GetBool(); } else { throw JsonException("ValueTypeException", string("Member ") + name + " is not boolean type"); } } void ExtractJsonResult(const rapidjson::Value& value, const char* name, string& dst) { JsonMemberCheck(value, name); if (value[name].IsString()) { dst = value[name].GetString(); } else { throw JsonException("ValueTypeException", string("Member ") + name + " is not string type"); } } const rapidjson::Value& GetJsonValue(const rapidjson::Value& value, const char* name) { JsonMemberCheck(value, name); if (value[name].IsObject() || value[name].IsArray()) { return value[name]; } else { throw JsonException("ValueTypeException", string("Member ") + name + " is not json value type"); } } static void ErrorCheck(const string& response, const string& requestId, const int32_t httpCode) { rapidjson::Document document; try { ExtractJsonResult(response, document); string errorCode; ExtractJsonResult(document, LOG_ERROR_CODE, errorCode); string errorMessage; ExtractJsonResult(document, LOG_ERROR_MESSAGE, errorMessage); throw LOGException(errorCode, errorMessage, requestId, httpCode); } catch (JsonException& e) { if (httpCode == 500) { throw LOGException(LOGE_INTERNAL_SERVER_ERROR, response, requestId, httpCode); } else { throw LOGException(LOGE_BAD_RESPONSE, string("Unextractable error:") + response, requestId, httpCode); } } } static int32_t ParseLogGroupList(const int32_t logGroupCount, const uint32_t uncompressedSize, const string& content, pb::LogGroupList& logGroupList) { string uncompressed = ""; if (! aliyun_log_sdk_v6::CompressAlgorithm::UncompressLz4(content, uncompressedSize, uncompressed)) { return 1; } if (logGroupCount > 0) { if (!logGroupList.ParseFromString(uncompressed) || (int32_t)logGroupList.logGroupList.size() != logGroupCount) { logGroupList.Clear(); return 2; } } return 0; } bool ShardItem::isReadOnly() { return status == LOG_SHARD_STATUS_READONLY; } static void ParseBatchLogData(const string& nextCursor, const pb::LogGroupList& logGroupList, BatchLogData& batchLogData) { batchLogData.nextCursor = nextCursor; batchLogData.logGroupCount = logGroupList.logGroupList.size(); for (const auto& logGroup : logGroupList.logGroupList) { vector<LogItem> logItems; for (const Log& log : logGroup.logs) { LogItem logItem; logItem.timestamp = log.time; logItem.topic = logGroup.topic; logItem.source = logGroup.source; for (const auto& content : log.contents) { logItem.data.push_back(pair<string, string>(content.key, content.value)); } logItems.push_back(logItem); } batchLogData.logGroups.push_back(std::move(logItems)); } } LOGClient::LOGClient(const string& slsHost, const string& accessKeyId, const string& accessKey, int32_t timeout, const string& source, bool compressFlag): mSlsHost(slsHost), mAccessKeyId(accessKeyId), mAccessKey(accessKey), mSource(source), mCompressFlag(compressFlag), mTimeout(timeout), mUserAgent(LOG_SDK_IDENTIFICATION), mHostFieldSuffix(""), mIsHostRawIp(false), mGetDateString(CodecTool::GetDateString), mLOGSend(LOGAdapter::Send) { internal::InitNetWork(); SetSlsHost(slsHost); if(mSource=="") { mSource = internal::GetHostIp(); } if(mTimeout<=0) { mTimeout = LOG_REQUEST_TIMEOUT; } mMaxSendSpeedInBytePerSec = 1024 * 1024 * 1024; } LOGClient::LOGClient(const string& slsHost, const string& accessKeyId, const string& accessKey, const std::string& securityToken, int32_t timeout, const string& source, bool compressFlag): mSlsHost(slsHost), mAccessKeyId(accessKeyId), mAccessKey(accessKey), mSecurityToken(securityToken), mSource(source), mCompressFlag(compressFlag), mTimeout(timeout), mUserAgent(LOG_SDK_IDENTIFICATION), mHostFieldSuffix(""), mIsHostRawIp(false), mGetDateString(CodecTool::GetDateString), mLOGSend(LOGAdapter::Send) { internal::InitNetWork(); SetSlsHost(slsHost); if(mSource=="") { mSource = internal::GetHostIp(); } if(mTimeout<=0) { mTimeout = LOG_REQUEST_TIMEOUT; } mMaxSendSpeedInBytePerSec = 1024 * 1024 * 1024; } LOGClient::~LOGClient() { internal::CleanNetWork(); } static void ConvertLogGroup(const vector<LogItem>& logItems, pb::LogGroup& logGroup) { if (logItems.size()==0) { throw LOGException(LOGE_PARAMETER_INVALID, "Empty LogItem."); } for (const auto& logItem : logItems) { Log log(logItem.timestamp, {}); for (auto& p : logItem.data) { log.contents.push_back(LogContent{p.first, p.second}); } logGroup.logs.push_back(std::move(log)); } } void LOGClient::SetAccessKey(const string& accessKey) { std::lock_guard<std::mutex> lock(mMutex); mAccessKey = accessKey; } string LOGClient::GetAccessKey() { std::lock_guard<std::mutex> lock(mMutex); return mAccessKey; } void LOGClient::SetAccessKeyId(const string& accessKeyId) { std::lock_guard<std::mutex> lock(mMutex); mAccessKeyId = accessKeyId; } string LOGClient::GetAccessKeyId() { std::lock_guard<std::mutex> lock(mMutex); return mAccessKeyId; } string LOGClient::GetSlsHost() { std::lock_guard<std::mutex> lock(mMutex); return mSlsHost; } string LOGClient::GetHostFieldSuffix() { std::lock_guard<std::mutex> lock(mMutex); return mHostFieldSuffix; } void LOGClient::SetSlsHost(const string& slsHost) { std::lock_guard<std::mutex> lock(mMutex); //mSlsHost = slsHost; size_t bpos = slsHost.find("://"); if(bpos == string::npos) bpos = 0; else bpos += 3; string tmpstr = slsHost.substr(bpos); size_t epos = tmpstr.find_first_of("/"); if(epos == string::npos) epos = tmpstr.length(); string host = tmpstr.substr(0,epos); mSlsHost = host; mHostFieldSuffix = "." + host; size_t i = 0; for(; i < host.length(); ++i) { if((host[i] >= 'a' && host[i] <= 'z') || (host[i] >= 'A' && host[i] <= 'Z')) break; } if(i == host.length()) mIsHostRawIp = true; else mIsHostRawIp = false; } void LOGClient::SetCommonHeader(map<string, string>& httpHeader, int32_t contentLength, const string& project) { if (project != "") { httpHeader[HOST] = project + GetHostFieldSuffix(); } else { httpHeader[HOST] = GetSlsHost(); } httpHeader[USER_AGENT] = mUserAgent; httpHeader[X_LOG_APIVERSION] = LOG_API_VERSION; httpHeader[X_LOG_SIGNATUREMETHOD] = HMAC_SHA1; httpHeader[DATE] = CodecTool::GetDateString(); httpHeader[CONTENT_LENGTH] = std::to_string(contentLength); if(!mSecurityToken.empty()) { httpHeader[X_ACS_SECURITY_TOKEN] = mSecurityToken; } } void LOGClient::SetCommonParameter(map<string, string>& parameterList) { } string LOGClient::GetHost(const string& project) { if(mIsHostRawIp || project == "") { return GetSlsHost(); } else { return project + GetHostFieldSuffix(); } } void LOGClient::SendRequest(const string& project, const string& httpMethod, const string& url, const string& body, const map<string, string>& parameterList, map<string, string>& header, HttpMessage& httpMessage) { string host = GetHost(project); SetCommonHeader(header, body.length(), project); string signature = LOGAdapter::GetUrlSignature(httpMethod, url, header, parameterList, body, GetAccessKey()); header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + GetAccessKeyId() + ':' + signature; string queryString; LOGAdapter::GetQueryString(parameterList, queryString); mLOGSend(httpMethod, host, 80, url, queryString, header, body, mTimeout, httpMessage, mMaxSendSpeedInBytePerSec); if (httpMessage.statusCode != 200) { ErrorCheck(httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } PostLogStoreLogsResponse LOGClient::PostLogStoreLogs(const string& project, const string& logstore, const string& topic, const vector<LogItem>& logItems,const std::string& hashKey) { pb::LogGroup logGroup; ConvertLogGroup(logItems, logGroup); logGroup.topic = topic; logGroup.source = mSource; return PostLogStoreLogs(project, logstore, logGroup, hashKey); } PostLogStoreLogsResponse LOGClient::PostLogStoreLogs(const string& project, const string& logstore, const pb::LogGroup& logGroup,const std::string& hashKey) { string body; string serializeData; if (logGroup.source.empty()) { pb::LogGroup newLogGroup; newLogGroup = logGroup; newLogGroup.source = mSource; newLogGroup.SerializeToString(serializeData); } else { logGroup.SerializeToString(serializeData); } string operation = LOGSTORES; operation.append("/").append(logstore); if(hashKey.empty() ) operation.append(""); else operation.append("/shards/route"); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_PROTOBUF; if(mCompressFlag) { if(! aliyun_log_sdk_v6::CompressAlgorithm::CompressLz4(serializeData, body)) { throw LOGException(LOGE_UNKNOWN_ERROR, "Data compress failed."); } httpHeader[X_LOG_COMPRESSTYPE] = LOG_LZ4; } else { body = serializeData; } httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(serializeData.size()); httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); map<string, string> parameterList; if(hashKey.empty() == false) parameterList["key"] = hashKey; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); PostLogStoreLogsResponse ret; ret.bodyBytes = body.size(); ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } static void ExtractLogMeta(HttpMessage& httpMessage, LogMeta& logMeta) { rapidjson::Document document; try { ExtractJsonResult(httpMessage.content, document); logMeta.progress = httpMessage.header[X_LOG_PROGRESS]; logMeta.count = atoi(httpMessage.header[X_LOG_COUNT].c_str()); //const rapidjson::Value& histograms = GetJsonValue(document, "histograms"); for (rapidjson::Value::ConstValueIterator itr = document.Begin(); itr != document.End(); ++itr) { logMeta.metaItems.push_back(LogMetaItem()); LogMetaItem& metaItem = logMeta.metaItems.back(); ExtractJsonResult(*itr, "from", metaItem.from); ExtractJsonResult(*itr, "to", metaItem.to); ExtractJsonResult(*itr, "count", metaItem.count); if ((*itr).HasMember("progress")) { ExtractJsonResult(*itr, "progress", metaItem.progress); } } } catch(JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } GetLogStoreHistogramResponse LOGClient::GetLogStoreHistogram(const string& project, const string& logstore, const string& topic, time_t beginTime, time_t endTime, const string& query) { string operation = LOGSTORES; operation.append("/").append(logstore).append(INDEX); string body = ""; map<string, string> parameterList; SetCommonParameter(parameterList); parameterList["type"] = "histogram"; parameterList["topic"] = topic; parameterList["from"] = std::to_string(beginTime); parameterList["to"] = std::to_string(endTime); parameterList["query"] = query; map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); GetLogStoreHistogramResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractLogMeta(httpResponse, ret.result); return ret; } static void ExtractLogs(HttpMessage& httpMessage, LogResult& logResult) { rapidjson::Document document; try { ExtractJsonResult(httpMessage.content, document); logResult.progress = httpMessage.header[X_LOG_PROGRESS]; logResult.logline = atoi(httpMessage.header[X_LOG_COUNT].c_str()); //const rapidjson::Value& logs = GetJsonValue(document, "logs"); for (rapidjson::Value::ConstValueIterator itr = document.Begin(); itr != document.End(); ++itr) { logResult.logdatas.push_back(LogItem()); LogItem& logItem = logResult.logdatas.back(); string logTimeStamp = ""; ExtractJsonResult(*itr, LOGITEM_TIME_STAMP_LABEL, logTimeStamp); logItem.timestamp = atoi(logTimeStamp.c_str()); ExtractJsonResult(*itr, LOGITEM_SOURCE_LABEL, logItem.source); for (rapidjson::Value::ConstMemberIterator mItr = itr->MemberBegin(); mItr != itr->MemberEnd(); ++mItr) { if (string(mItr->name.GetString()) != string(LOGITEM_TIME_STAMP_LABEL) && string(mItr->name.GetString()) != string(LOGITEM_SOURCE_LABEL)) { if (mItr->value.IsString()) { logItem.data.push_back(pair<string,string>(mItr->name.GetString(), mItr->value.GetString())); } else { throw LOGException(LOGE_BAD_RESPONSE, string("Invalid json format, value is not string type\tbad json format:") + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } } } } catch(JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } GetLogStoreLogsResponse LOGClient::GetLogStoreLogs(const string& project, const string& logstore, const string& topic, time_t beginTime, time_t endTime, bool reverseFlag, int32_t lines, int32_t offset, const string& query) { string operation = LOGSTORES; operation.append("/").append(logstore).append(INDEX); string body = ""; map<string, string> parameterList; SetCommonParameter(parameterList); parameterList["type"]="log"; parameterList["topic"]=topic; parameterList["from"] = std::to_string(beginTime); parameterList["to"] = std::to_string(endTime); parameterList["reverse"] = reverseFlag ? "true" : "false"; parameterList["line"] = std::to_string(lines); parameterList["offset"] = std::to_string(offset); parameterList["query"]=query; map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); GetLogStoreLogsResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractLogs(httpResponse, ret.result); return ret; } CreateConsumerGroupResponse LOGClient::CreateConsumerGroup(const std::string& project , const std::string& logstore, const std::string& consumergroup, const uint32_t timeoutInSec, const bool inOrder) { ConsumerGroup group(consumergroup, timeoutInSec, inOrder); string operation = "/logstores/" + logstore + "/consumergroups"; string body = group.ToRequestJsonString(); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); CreateConsumerGroupResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } DeleteConsumerGroupResponse LOGClient::DeleteConsumerGroup(const std::string& project , const std::string& logstore, const std::string& consumergroup) { string operation = "/logstores/" + logstore + "/consumergroups/" + consumergroup; map<string, string> httpHeader; map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_DELETE, operation, "", parameterList, httpHeader, httpResponse); DeleteConsumerGroupResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } Response LOGClient::CreateLogStore(const string& project, const LogStore& logStore) { string body = logStore.ToRequestJsonString(); string operation = LOGSTORES; map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } Response LOGClient::UpdateLogStore(const string& project, const LogStore& logStore) { string body = logStore.ToRequestJsonString(); string operation = LOGSTORES; operation.append("/").append(logStore.GetLogStoreName()); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_PUT, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } Response LOGClient::DeleteLogStore(const string& project, const string& logStore) { string body = ""; string operation = LOGSTORES; operation.append("/").append(logStore); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = ""; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_DELETE, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } static void ExtractLogStore(HttpMessage& httpMessage, LogStore& logStore) { try { logStore.FromJsonString(httpMessage.content); } catch (JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } GetLogStoreResponse LOGClient::GetLogStore(const string& project, const string& logStore) { string body = ""; string operation = LOGSTORES; operation.append("/").append(logStore); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = ""; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); GetLogStoreResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractLogStore(httpResponse, ret.result); return ret; } static void ExtractHeartbeat(HttpMessage& httpMessage, std::vector<uint32_t>& shards) { rapidjson::Document doc; try { ExtractJsonResult(httpMessage.content, doc); auto array = doc.GetArray(); for (Value::ConstValueIterator itr = array.Begin(); itr != array.End(); ++itr) { shards.push_back(itr->GetUint()); } } catch(JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } static void ExtractConsumerGroupCheckpoints(HttpMessage& httpMessage, vector<ConsumerGroupCheckpoint>& cps) { rapidjson::Document doc; try { ExtractJsonResult(httpMessage.content, doc); auto array = doc.GetArray(); for (Value::ConstValueIterator itr = array.Begin(); itr != array.End(); ++itr) { ConsumerGroupCheckpoint cp((*itr)["shard"].GetUint(), (*itr)["checkpoint"].GetString(), (*itr)["updateTime"].GetUint64()); cps.push_back(cp); } } catch(JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } static void ExtractConsumerGroups(HttpMessage& httpMessage, vector<ConsumerGroup>& consumerGroups) { rapidjson::Document doc; try { ExtractJsonResult(httpMessage.content, doc); auto array = doc.GetArray(); for (Value::ConstValueIterator itr = array.Begin(); itr != array.End(); ++itr) { ConsumerGroup group((*itr)["name"].GetString(), (*itr)["timeout"].GetUint(), (*itr)["order"].GetBool()); consumerGroups.push_back(group); } } catch(JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } static void ExtractLogStores(HttpMessage& httpMessage, vector<string>& logStoresResult) { rapidjson::Document document; try { ExtractJsonResult(httpMessage.content, document); const rapidjson::Value& logStores = GetJsonValue(document, "logstores"); for (rapidjson::Value::ConstValueIterator itr = logStores.Begin(); itr != logStores.End(); ++itr) { if (itr->IsString()) { logStoresResult.push_back(itr->GetString()); } else { throw LOGException(LOGE_BAD_RESPONSE, string("Invalid json format, value is not string type\tbad json format:") + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } } catch(JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } ListConsumerGroupResponse LOGClient::ListConsumerGroup(const string& project, const std::string& logstore) { string operation = "/logstores/" + logstore + "/consumergroups"; map<string, string> parameterList; SetCommonParameter(parameterList); map<string, string> httpHeader; HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, "", parameterList, httpHeader, httpResponse); ListConsumerGroupResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractConsumerGroups(httpResponse, ret.consumerGroups); return ret; } HeartbeatResponse LOGClient::ConsumerGroupHeartbeat(const std::string& project, const std::string& logstore, const std::string& consumergroup, const std::string& consumer, const std::vector<uint32_t>& shards) { string operation = "/logstores/" + logstore + "/consumergroups/" + consumergroup; Heartbeat hb; for(std::vector<uint32_t>::const_iterator it = shards.begin(); it != shards.end(); ++it) hb.Add(*it); string body = hb.ToRequestJsonString(true); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); parameterList["type"] = "heartbeat"; parameterList["consumer"] = consumer; HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); HeartbeatResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractHeartbeat(httpResponse, ret.shards); return ret; } UpdateCheckpointResponse LOGClient::UpdateCheckpoint(const std::string& project, const std::string& logstore, const std::string& consumergroup, const int shard, const std::string& checkpoint) { string operation = "/logstores/" + logstore + "/consumergroups/" + consumergroup; ConsumerGroupCheckpoint cp(shard, checkpoint); string body = cp.ToRequestJsonString(); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); parameterList["type"] = "checkpoint"; parameterList["forceSuccess"] = "true"; HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); UpdateCheckpointResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } ListCheckpointResponse LOGClient::ListCheckpoint(const std::string& project, const std::string& logstore, const std::string& consumergroup) { string operation = "/logstores/" + logstore + "/consumergroups/" + consumergroup; map<string, string> parameterList; SetCommonParameter(parameterList); map<string, string> httpHeader; HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, "", parameterList, httpHeader, httpResponse); ListCheckpointResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractConsumerGroupCheckpoints(httpResponse, ret.consumerGroupCheckpoints); return ret; } ListLogStoresResponse LOGClient::ListLogStores(const string& project, const string& logstorename, const int32_t& offset, const int32_t& size) { const string operation = LOGSTORES; string body = ""; map<string, string> parameterList; SetCommonParameter(parameterList); if (logstorename != "") { parameterList["logstorename"] = logstorename; } if (offset >= 0) { parameterList["offset"] = std::to_string(offset); } if (size >= 0) { parameterList["size"] = std::to_string(size); } map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = ""; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); ListLogStoresResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractLogStores(httpResponse, ret.result); return ret; } Response LOGClient::CreateIndex(const string& project, const string& logStore, const Index& index) { string body = index.ToRequestJsonString(); string operation = LOGSTORES; operation.append("/").append(logStore).append(INDEX); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } Response LOGClient::UpdateIndex(const string& project, const string& logStore, const Index& index) { string body = index.ToRequestJsonString(); string operation = LOGSTORES; operation.append("/").append(logStore).append(INDEX); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_PUT, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } static void ExtractIndex(HttpMessage& httpMessage, Index& index) { try { index.FromJsonString(httpMessage.content); } catch (JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } GetIndexResponse LOGClient::GetIndex(const string& project, const string& logStore) { string body = ""; string operation = LOGSTORES; operation.append("/").append(logStore).append(INDEX); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = ""; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); GetIndexResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractIndex(httpResponse, ret.result); return ret; } Response LOGClient::DeleteIndex(const string& project, const string& logStore) { string body = ""; string operation = LOGSTORES; operation.append("/").append(logStore).append(INDEX); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = ""; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_DELETE, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } Response LOGClient::CreateConfig(const string& project, const Config& config) { string body = config.ToRequestJsonString(); string operation = CONFIGS; map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } Response LOGClient::UpdateConfig(const string& project, const Config& config) { string body = config.ToRequestJsonString(); string operation = CONFIGS; operation.append("/").append(config.GetConfigName()); map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_PUT, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } Response LOGClient::DeleteConfig(const string& project, const string& config) { string body = ""; string operation = CONFIGS; operation.append("/").append(config); map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_DELETE, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } static void ExtractConfig(HttpMessage& httpMessage, Config& config) { try { config.FromJsonString(httpMessage.content); } catch (JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } GetConfigResponse LOGClient::GetConfig(const string& project, const string& config) { string body = ""; string operation = CONFIGS; operation.append("/").append(config); map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); GetConfigResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractConfig(httpResponse, ret.result); return ret; } static void ExtractConfigs(HttpMessage& httpMessage, vector<string>& configsResult) { rapidjson::Document document; try { ExtractJsonResult(httpMessage.content, document); const rapidjson::Value& configs = GetJsonValue(document, "configs"); for (rapidjson::Value::ConstValueIterator itr = configs.Begin(); itr != configs.End(); ++itr) { if (itr->IsString()) { configsResult.push_back(itr->GetString()); } else { throw LOGException(LOGE_BAD_RESPONSE, string("Invalid json format, value is not string type\tbad json format:") + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } } catch(JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } ListConfigsResponse LOGClient::ListConfigs(const string& project, const string& configName, const int32_t& offset, const int32_t& size) { const string operation = CONFIGS; string body; map<string, string> parameterList; SetCommonParameter(parameterList); if (configName != "") { parameterList["configname"] = configName; } if (offset >= 0) { parameterList["offset"] = std::to_string(offset); } if (size >= 0) { parameterList["size"] = std::to_string(size); } map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); ListConfigsResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractConfigs(httpResponse, ret.result); return ret; } Response LOGClient::CreateMachineGroup(const string& project, const MachineGroup& machineGroup) { string body = machineGroup.ToRequestJsonString(); string operation = MACHINEGROUPS; map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } Response LOGClient::UpdateMachineGroup(const string& project, const MachineGroup& machineGroup) { string body = machineGroup.ToRequestJsonString(); string operation = MACHINEGROUPS; operation.append("/").append(machineGroup.GetGroupName()); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_PUT, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } Response LOGClient::DeleteMachineGroup(const string& project, const string& machineGroup) { string body = ""; string operation = MACHINEGROUPS; operation.append("/").append(machineGroup); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_DELETE, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } static void ExtractMachineGroup(HttpMessage& httpMessage, MachineGroup& machineGroup) { try { machineGroup.FromJsonString(httpMessage.content); } catch (JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } GetMachineGroupResponse LOGClient::GetMachineGroup(const string& project, const string& machineGroup) { string body = ""; string operation = MACHINEGROUPS; operation.append("/").append(machineGroup); map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); GetMachineGroupResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractMachineGroup(httpResponse, ret.result); return ret; } static void ExtractMachineGroups(HttpMessage& httpMessage, vector<string>& machineGroupsResult) { rapidjson::Document document; try { ExtractJsonResult(httpMessage.content, document); const rapidjson::Value& machineGroups = GetJsonValue(document, "machinegroups"); for (rapidjson::Value::ConstValueIterator itr = machineGroups.Begin(); itr != machineGroups.End(); ++itr) { if (itr->IsString()) { machineGroupsResult.push_back(itr->GetString()); } else { throw LOGException(LOGE_BAD_RESPONSE, string("Invalid json format, value is not string type\tbad json format:") + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } } catch(JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } ListMachineGroupsResponse LOGClient::ListMachineGroups(const string& project, const int32_t& offset, const int32_t& size) { const string operation = MACHINEGROUPS; string body = ""; map<string, string> parameterList; SetCommonParameter(parameterList); if (offset >= 0) { parameterList["offset"] = std::to_string(offset); } if (size >= 0) { parameterList["size"] = std::to_string(size); } map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); ListMachineGroupsResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractMachineGroups(httpResponse, ret.result); return ret; } Response LOGClient::ApplyConfigToMachineGroup(const string& project, const string& machineGroup, const string& config) { string body = ""; string operation = MACHINEGROUPS; operation.append("/").append(machineGroup).append(CONFIGS) .append("/").append(config); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_PUT, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } Response LOGClient::RemoveConfigFromMachineGroup(const string& project, const string& machineGroup, const string& config) { string body = ""; string operation = MACHINEGROUPS; operation.append("/").append(machineGroup).append(CONFIGS) .append("/").append(config); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_DELETE, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } static void ExtractApplyConfigs(HttpMessage& httpMessage, vector<string>& configsResult) { rapidjson::Document document; try { ExtractJsonResult(httpMessage.content, document); const rapidjson::Value& configs = GetJsonValue(document, "configs"); for (rapidjson::Value::ConstValueIterator itr = configs.Begin(); itr != configs.End(); ++itr) { if (itr->IsString()) { configsResult.push_back(itr->GetString()); } else { throw LOGException(LOGE_BAD_RESPONSE, string("Invalid json format, value is not string type\tbad json format:") + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } } catch(JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } GetAppliedConfigsResponse LOGClient::GetAppliedConfigs(const string& project, const string& machineGroup) { string body = ""; string operation = MACHINEGROUPS; operation.append("/").append(machineGroup).append(CONFIGS); map<string, string> httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; httpHeader[CONTENT_MD5] = CodecTool::CalcMD5(body); httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); map<string, string> parameterList; SetCommonParameter(parameterList); HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); GetAppliedConfigsResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractApplyConfigs(httpResponse, ret.result); return ret; } LogStoreSqlResponse LOGClient::ExecuteLogStoreSql(const std::string &project, const std::string &logstore, time_t beginTime, time_t endTime, const std::string &query, bool powerSql) { string operation = LOGSTORES; operation.append("/").append(logstore).append(INDEX); string body = ""; map<string, string> parameterList; SetCommonParameter(parameterList); parameterList["type"] = "log"; parameterList["from"] = std::to_string(beginTime); parameterList["to"] = std::to_string(endTime); parameterList["query"] = query; parameterList["powerSql"] = powerSql ? "true" : "false"; map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); LogStoreSqlResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; if (httpResponse.header.count(X_LOG_PROCESSED_ROWS) > 0) ret.processedRows = atol(httpResponse.header.at(X_LOG_PROCESSED_ROWS).c_str()); if (httpResponse.header.count(X_LOG_ELASPED_MILLISECOND) > 0) ret.elapsedMilli = atol(httpResponse.header.at(X_LOG_ELASPED_MILLISECOND).c_str()); if (httpResponse.header.count(X_LOG_CPU_SEC) > 0) ret.cpuSec = atof(httpResponse.header.at(X_LOG_CPU_SEC).c_str()); if (httpResponse.header.count(X_LOG_CPU_CORES) > 0) ret.cpuCore = atol(httpResponse.header.at(X_LOG_CPU_CORES).c_str()); ExtractLogs(httpResponse, ret.result); return ret; } ProjectSqlResponse LOGClient::ExecuteProjectSql(const std::string &project, const std::string &query, bool powerSql) { string operation = "/logs"; string body = ""; map<string, string> parameterList; SetCommonParameter(parameterList); parameterList["query"] = query; parameterList["powerSql"] = powerSql ? "true" : "false"; map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); ProjectSqlResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; if (httpResponse.header.count(X_LOG_PROCESSED_ROWS) > 0) ret.processedRows = atol(httpResponse.header.at(X_LOG_PROCESSED_ROWS).c_str()); if (httpResponse.header.count(X_LOG_ELASPED_MILLISECOND) > 0) ret.elapsedMilli = atol(httpResponse.header.at(X_LOG_ELASPED_MILLISECOND).c_str()); if (httpResponse.header.count(X_LOG_CPU_SEC) > 0) ret.cpuSec = atof(httpResponse.header.at(X_LOG_CPU_SEC).c_str()); if (httpResponse.header.count(X_LOG_CPU_CORES) > 0) ret.cpuCore = atol(httpResponse.header.at(X_LOG_CPU_CORES).c_str()); ExtractLogs(httpResponse, ret.result); return ret; } CreateSqlInstanceResponse LOGClient::CreateSqlInstance(const std::string &project, int cu) { std::string operation = "/sqlinstance"; std::string body; rapidjson::StringBuffer stringBuffer; rapidjson::Writer<rapidjson::StringBuffer> writer(stringBuffer); writer.StartObject(); writer.Key("cu"); writer.Int(cu); writer.EndObject(); body = stringBuffer.GetString(); map<string, string> parameterList; SetCommonParameter(parameterList); map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); CreateSqlInstanceResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } UpdateSqlInstanceResponse LOGClient::UpdateSqlInstance(const std::string &project, int cu) { std::string operation = "/sqlinstance"; std::string body; rapidjson::StringBuffer stringBuffer; rapidjson::Writer<rapidjson::StringBuffer> writer(stringBuffer); writer.StartObject(); writer.Key("cu"); writer.Int(cu); writer.EndObject(); body = stringBuffer.GetString(); map<string, string> parameterList; SetCommonParameter(parameterList); map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = TYPE_LOG_JSON; HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); UpdateSqlInstanceResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } ListSqlInstanceResponse LOGClient::ListSqlInstance(const std::string &project) { std::string operation = "/sqlinstance"; std::string body; map<string, string> parameterList; SetCommonParameter(parameterList); map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); ListSqlInstanceResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; rapidjson::Document document; try { ExtractJsonResult(httpResponse.content, document); for (rapidjson::Value::ConstValueIterator itr = document.Begin(); itr != document.End(); ++itr) { SqlInstance sqlInstance; ExtractJsonResult(*itr, "name", sqlInstance.name); string value; ExtractJsonResult(*itr, "cu", value); sqlInstance.cu = atoi(value.c_str()); ExtractJsonResult(*itr, "updateTime", value); sqlInstance.updateTime = atoi(value.c_str()); ExtractJsonResult(*itr, "createTime", value); sqlInstance.createTime = atoi(value.c_str()); ret.sqlInstances.push_back(sqlInstance); } } catch (JsonException &e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpResponse.content, httpResponse.header[X_LOG_REQUEST_ID], httpResponse.statusCode); } return ret; } static void ExtractTopics(HttpMessage& httpMessage, vector<string>& result) { rapidjson::Document document; try { ExtractJsonResult(httpMessage.content, document); for (rapidjson::Value::ConstValueIterator itr = document.Begin(); itr != document.End(); ++itr) { if (itr->IsString()) { result.push_back(itr->GetString()); } else { throw LOGException(LOGE_BAD_RESPONSE, string("Invalid json format, value is not string type\tbad json format:") + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } } catch(JsonException& e) { throw LOGException(LOGE_BAD_RESPONSE, e.GetMessage() + "\tbad json format:" + httpMessage.content, httpMessage.header[X_LOG_REQUEST_ID], httpMessage.statusCode); } } ListLogStoreTopicsResponse LOGClient::ListLogStoreTopics(const string& project, const string& logstore, const int64_t& line, const string& nextToken) { string operation = LOGSTORES; operation.append("/").append(logstore).append(INDEX); string body; map<string, string> parameterList; SetCommonParameter(parameterList); parameterList["type"] = "topic"; parameterList["line"] = std::to_string(line); parameterList["token"] = nextToken; map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); ListLogStoreTopicsResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ExtractTopics(httpResponse, ret.result); return ret; } GetCursorResponse LOGClient::GetCursor(const string& project, const string& logstore, uint32_t shardId, uint32_t fromTime) { return GetCursor(project, logstore, shardId, "from",std::to_string(fromTime)); } GetCursorResponse LOGClient::GetCursor(const string& project, const string& logstore, uint32_t shardId, LOG_Cursor_Mode cursorMode) { string cursorModePara; switch (cursorMode) { case CURSOR_MODE_BEGIN: cursorModePara = "begin"; break; case CURSOR_MODE_END: cursorModePara = "end"; break; default: cursorModePara = ""; } return GetCursor(project, logstore, shardId, "from", cursorModePara); } GetCursorResponse LOGClient::GetCursor(const string& project, const string& logstore, uint32_t shardId, const string& paraKey, const string& paraValue) { if (project.empty() || logstore.empty()) throw LOGException(LOGE_PARAMETER_INVALID, "project or logstore invalid."); string operation = LOGSTORES + string("/") + logstore + SHARDS + string("/") + std::to_string(shardId); string body; map<string, string> parameterList; SetCommonParameter(parameterList); parameterList[LOG_TYPE] = LOG_TYPE_CURSOR; parameterList[paraKey] = paraValue; map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); GetCursorResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; rapidjson::Document document; ExtractJsonResult(httpResponse.content, document); ExtractJsonResult(document, "cursor", ret.result); return ret; } ListShardsResponse LOGClient::ListShards(const string& project, const string& logstore) { if (project.empty() || logstore.empty()) throw LOGException(LOGE_PARAMETER_INVALID, "project or logstore invalid."); string operation = LOGSTORES + string("/") + logstore + SHARDS; string body = ""; map<string, string> parameterList; SetCommonParameter(parameterList); map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); rapidjson::Document document; ExtractJsonResult(httpResponse.content, document); ListShardsResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; if (document.IsArray()) { for (rapidjson::Value::ConstValueIterator itr = document.Begin(); itr != document.End(); ++itr) { if (itr->IsObject()) { ShardItem shardItem; ExtractJsonResult(*itr, "shardID", shardItem.shardId); ExtractJsonResult(*itr,"status",shardItem.status); ExtractJsonResult(*itr,"inclusiveBeginKey",shardItem.inclusiveBeginKey); ExtractJsonResult(*itr,"exclusiveEndKey",shardItem.exclusiveEndKey); ExtractJsonResult(*itr, "createTime", shardItem.createTime); ret.result.push_back(shardItem); } else { throw LOGException(LOGE_BAD_RESPONSE, string("Invalid json format, value is not json object type\tbad json format:") + httpResponse.content, httpResponse.header[X_LOG_REQUEST_ID], httpResponse.statusCode); } } } return ret; } SplitShardResponse LOGClient::SplitShard(const string& project, const string& logstore,const int shardId,const std::string& middleHash) { if (project.empty() || logstore.empty()) throw LOGException(LOGE_PARAMETER_INVALID, "project or logstore invalid."); string operation = LOGSTORES + string("/") + logstore + SHARDS+"/"+ std::to_string(shardId); ; string body = ""; map<string, string> parameterList; parameterList["key"] = middleHash; parameterList["action"] = "split"; SetCommonParameter(parameterList); map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); rapidjson::Document document; ExtractJsonResult(httpResponse.content, document); ListShardsResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; if (document.IsArray()) { for (rapidjson::Value::ConstValueIterator itr = document.Begin(); itr != document.End(); ++itr) { if (itr->IsObject()) { ShardItem shardItem; ExtractJsonResult(*itr, "shardID", shardItem.shardId); ExtractJsonResult(*itr,"status",shardItem.status); ExtractJsonResult(*itr,"inclusiveBeginKey",shardItem.inclusiveBeginKey); ExtractJsonResult(*itr,"exclusiveEndKey",shardItem.exclusiveEndKey); ret.result.push_back(shardItem); } else { throw LOGException(LOGE_BAD_RESPONSE, string("Invalid json format, value is not json object type\tbad json format:") + httpResponse.content, httpResponse.header[X_LOG_REQUEST_ID], httpResponse.statusCode); } } } return ret; } MergeShardsResponse LOGClient::MergeShard(const string& project, const string& logstore,const int shardId) { if (project.empty() || logstore.empty()) throw LOGException(LOGE_PARAMETER_INVALID, "project or logstore invalid."); string operation = LOGSTORES + string("/") + logstore + SHARDS+string("/") + std::to_string(shardId);; string body = ""; map<string, string> parameterList; parameterList["action"] = "merge"; SetCommonParameter(parameterList); map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse); rapidjson::Document document; ExtractJsonResult(httpResponse.content, document); ListShardsResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; if (document.IsArray()) { for (rapidjson::Value::ConstValueIterator itr = document.Begin(); itr != document.End(); ++itr) { if (itr->IsObject()) { ShardItem shardItem; ExtractJsonResult(*itr, "shardID", shardItem.shardId); ExtractJsonResult(*itr,"status",shardItem.status); ExtractJsonResult(*itr,"inclusiveBeginKey",shardItem.inclusiveBeginKey); ExtractJsonResult(*itr,"exclusiveEndKey",shardItem.exclusiveEndKey); ExtractJsonResult(*itr,"createTime",shardItem.createTime); ret.result.push_back(shardItem); } else { throw LOGException(LOGE_BAD_RESPONSE, string("Invalid json format, value is not json object type\tbad json format:") + httpResponse.content, httpResponse.header[X_LOG_REQUEST_ID], httpResponse.statusCode); } } } return ret; } Response LOGClient::DeleteShard( const std::string& project,const std::string& logstore,const int shardId) { if (project.empty() || logstore.empty()) throw LOGException(LOGE_PARAMETER_INVALID, "project or logstore invalid."); string operation = LOGSTORES + string("/") + logstore + SHARDS+string("/") + std::to_string(shardId);; string body = ""; map<string, string> parameterList; SetCommonParameter(parameterList); map<string, string> httpHeader; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_DELETE, operation, body, parameterList, httpHeader, httpResponse); Response ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; return ret; } GetPbBatchLogResponse LOGClient::GetPbBatchLog(const string& project, const string& logstore, uint32_t shardId, int count, const string& cursor, const std::string& endCursor) { GetPbBatchLogResponse ret; PullDataResponse res = GetLogGroupList(project, logstore, shardId, count, cursor, endCursor, ret.result.logGroupList); ret.statusCode = res.statusCode; ret.requestId = res.requestId; ret.result.nextCursor = res.cursor; ret.result.logGroupCount = ret.result.logGroupList.logGroupList.size(); return ret; } GetBatchLogResponse LOGClient::GetBatchLog(const string& project, const string& logstore, uint32_t shardId, int count, const string& cursor, const std::string& endCursor) { GetBatchLogResponse ret; pb::LogGroupList logGroupList; PullDataResponse res = GetLogGroupList(project, logstore, shardId, count, cursor, endCursor, logGroupList); ret.statusCode = res.statusCode; ret.requestId = res.requestId; ParseBatchLogData(res.cursor, logGroupList, ret.result); return ret; } PullDataResponse LOGClient::GetLogGroupList(const string& project, const string& logstore, uint32_t shardId, int count, const string& cursor, const std::string& endCursor, pb::LogGroupList& logGroupList) { if (project.empty() || logstore.empty()) throw LOGException(LOGE_PARAMETER_INVALID, "project or logstore invalid."); string operation = LOGSTORES + string("/") + logstore + SHARDS + string("/") + std::to_string(shardId); string body; map<string, string> parameterList; SetCommonParameter(parameterList); parameterList["type"] = "log"; parameterList["cursor"] = cursor; if (!endCursor.empty()) parameterList["end_cursor"] = endCursor; parameterList["count"] = std::to_string(count); map<string, string> httpHeader; httpHeader[ACCEPT_ENCODING] = LOG_LZ4; httpHeader[HTTP_ACCEPT] = TYPE_LOG_PROTOBUF; httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(body.length()); httpHeader[CONTENT_TYPE] = ""; HttpMessage httpResponse; SendRequest(project, HTTP_GET, operation, body, parameterList, httpHeader, httpResponse); int32_t retCode = ParseLogGroupList(atoi(httpResponse.header["x-log-count"].c_str()), atoi(httpResponse.header[X_LOG_BODYRAWSIZE].c_str()), httpResponse.content, logGroupList); if (retCode == 1) { throw LOGException(LOGE_BAD_RESPONSE, "Fail to uncompress data", httpResponse.header[X_LOG_REQUEST_ID], httpResponse.statusCode); } else if (retCode == 2) { throw LOGException(LOGE_BAD_RESPONSE, "Parse LogGroupLddist from string fail", httpResponse.header[X_LOG_REQUEST_ID], httpResponse.statusCode); } PullDataResponse ret; ret.statusCode = httpResponse.statusCode; ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; ret.cursor = httpResponse.header[X_LOG_CURSOR]; return ret; } }