client.h (333 lines of code) (raw):
#ifndef __LOGV6_CLIENT_H__
#define __LOGV6_CLIENT_H__
#include <string>
#include <vector>
#include <map>
#include <utility>
#include <mutex>
#include "RestfulApiCommon.h"
#include "pb.h"
#include "resource.h"
namespace ehttp
{
class HttpController;
class MemHttpRequest;
class MemHttpResponse;
}
namespace google
{
namespace protobuf
{
class Closure;
}
}
typedef class google::protobuf::Closure RPCClosure;
namespace aliyun_log_sdk_v6
{
void ExtractJsonResult(const std::string& response, rapidjson::Document& document);
void JsonMemberCheck(const rapidjson::Value& value, const char* name);
void ExtractJsonResult(const rapidjson::Value& value, const char* name, int32_t& number);
void ExtractJsonResult(const rapidjson::Value& value, const char* name, uint32_t& number);
void ExtractJsonResult(const rapidjson::Value& value, const char* name, int64_t& number);
void ExtractJsonResult(const rapidjson::Value& value, const char* name, uint64_t& number);
void ExtractJsonResult(const rapidjson::Value& value, const char* name, bool& boolean);
void ExtractJsonResult(const rapidjson::Value& value, const char* name, std::string& dst);
const rapidjson::Value& GetJsonValue(const rapidjson::Value& value, const char* name);
/** Struct which is the definition of the log item.
*/
extern const char* const LOG_SDK_IDENTIFICATION;
typedef struct
{
uint32_t timestamp;///<The timestamp of the log item.
std::string source;///<The source of the log item.
std::string topic;
std::vector<std::pair<std::string, std::string> > data;///< The data of the log item, including many key-value pairs.
}LogItem;
typedef struct
{
std::string progress; // "Complete" if the request is full success, other wise the [logdatas] means part of the result
int32_t logline; // the logs lines returned
std::vector<LogItem> logdatas; //the result data returned
}LogResult;
/** struct store the log meta info for every specific time range [beginTime , endTime)
*/
typedef struct
{
uint32_t from; /// the log meta begin time (include)
uint32_t to; /// the log meta end time(exclude)
uint32_t count; /// total log count in [beginTime, endTime)
std::string progress; /// a flag if there is more data not read, if it is set to true, the user can reduce the
/// read time range to get more precise result
}LogMetaItem;
/** total log meta info,
*/
typedef struct
{
uint32_t count; /// total log count found
std::string progress; /// a flag if there is more data not read, if it is set to true, the user can reduce the
/// read time range to get more precise result
std::vector<LogMetaItem> metaItems; /// the log meta for every small time range piece
}LogMeta;
typedef struct BatchLogData
{
std::vector<std::vector<LogItem> > logGroups;
uint32_t logGroupCount;
std::string nextCursor;
}BatchLogData;
typedef struct PbBatchLogData
{
pb::LogGroupList logGroupList;
uint32_t logGroupCount;
std::string nextCursor;
}PbBatchLogData;
typedef struct ShardItem
{
uint32_t shardId;
std::string status;
std::string inclusiveBeginKey ;
std::string exclusiveEndKey;
int createTime;
bool isReadOnly();
} ShardItem;
/**
*LOG Json Exception includes an error code and a detail message.
*/
class JsonException : public std::exception
{
/** Constructor with error code and message.
* @param errorCode LOG error code.
* @param message Detailed information for the exception.
* @return The objcect pointer.
*/
public:
JsonException(const std::string& errorCode, const std::string& message)
:mErrorCode(errorCode),mMessage(message)
{
}
~JsonException() throw()
{
}
/** Function that return error code.
* @param void None.
* @return Error code string.
*/
std::string GetErrorCode(void) const
{
return mErrorCode;
}
/** Function that return error message.
* @param void None.
* @return Error message string.
*/
std::string GetMessage(void) const
{
return mMessage;
}
private:
std::string mErrorCode;
std::string mMessage;
};
/*
* Responses
*/
struct Response
{
int32_t statusCode;
std::string requestId;
};
struct CreateConsumerGroupResponse: public Response
{
};
struct DeleteConsumerGroupResponse: public Response
{
};
struct ListConsumerGroupResponse: public Response
{
std::vector<ConsumerGroup> consumerGroups;
};
struct UpdateCheckpointResponse: public Response
{};
struct ListCheckpointResponse: public Response
{
std::vector<ConsumerGroupCheckpoint> consumerGroupCheckpoints;
};
struct HeartbeatResponse: public Response
{
std::vector<uint32_t> shards;
};
struct PostLogStoreLogsResponse : public Response
{
int32_t bodyBytes;
};
struct GetLogStoreLogsResponse : public Response
{
LogResult result;
};
struct GetLogStoreHistogramResponse : public Response
{
LogMeta result;
};
struct GetLogStoreResponse : public Response
{
LogStore result;
};
struct ListLogStoresResponse : public Response
{
std::vector<std::string> result;
};
struct GetCursorResponse : public Response
{
std::string result;
};
struct ListLogStoreTopicsResponse : public Response
{
std::vector<std::string> result;
};
struct ListShardsResponse : public Response
{
std::vector<ShardItem> result;
};
struct GetBatchLogResponse : public Response
{
BatchLogData result;
};
struct GetPbBatchLogResponse : public Response
{
PbBatchLogData result;
};
struct GetIndexResponse : public Response
{
Index result;
};
struct GetConfigResponse : public Response
{
Config result;
};
struct ListConfigsResponse : public Response
{
std::vector<std::string> result;
};
struct GetMachineGroupResponse : public Response
{
MachineGroup result;
};
struct ListMachineGroupsResponse : public Response
{
std::vector<std::string> result;
};
struct GetAppliedConfigsResponse : public Response
{
std::vector<std::string> result;
};
struct PullDataResponse : public Response
{
std::string cursor;
};
struct LogStoreSqlResponse : public Response
{
int64_t processedRows;
int64_t elapsedMilli;
double cpuSec;
int64_t cpuCore;
LogResult result;
LogStoreSqlResponse() : processedRows(0), elapsedMilli(0), cpuSec(0.0), cpuCore(0) {}
};
typedef LogStoreSqlResponse ProjectSqlResponse;
struct CreateSqlInstanceResponse : public Response
{
};
struct UpdateSqlInstanceResponse : public Response
{
};
struct SqlInstance
{
std::string name;
int cu;
time_t createTime;
time_t updateTime;
SqlInstance() : cu(0), createTime(0), updateTime(0) {}
};
struct ListSqlInstanceResponse : public Response
{
std::vector<SqlInstance> sqlInstances;
};
typedef ListShardsResponse SplitShardResponse ;
typedef ListShardsResponse MergeShardsResponse ;
/** A client which is used to access the LOG service.
*/
class LOGClient
{
public:
/** Constructor needs at least three parameters.
* @param LOGHost LOG service address, for example:http://LOG.aliyun-inc.com.
* @param accessKeyId Aliyun AccessKeyId.
* @param accessKey Aliyun AccessKey Secret.
* @param timeout Timeout time of one operation.
* @param source Source identifier used to differentiate data from different machines. If it is empty, constructor will use machine ip as its source.
* @param compressFlag The flag decides whether compresses the data or not when put data to LOG.
* @return The objcect pointer.
*/
LOGClient(const std::string& slsHost, const std::string& accessKeyId, const std::string& accessKey, int32_t timeout=LOG_REQUEST_TIMEOUT, const std::string& source="", bool compressFlag=true);
LOGClient(const std::string& slsHost, const std::string& accessKeyId, const std::string& accessKey, const std::string& securityToken, int32_t timeout=LOG_REQUEST_TIMEOUT, const std::string& source="", bool compressFlag=true);
~LOGClient();
CreateConsumerGroupResponse CreateConsumerGroup(const std::string& project ,
const std::string& logstore,
const std::string& consumergroup,
const uint32_t timeoutInSec,
const bool inOrder);
DeleteConsumerGroupResponse DeleteConsumerGroup(const std::string& project,
const std::string& logstore,
const std::string& consumergroup);
ListConsumerGroupResponse ListConsumerGroup(const std::string& project,
const std::string& logStore);
UpdateCheckpointResponse UpdateCheckpoint(const std::string& project, const std::string& logstore, const std::string& consumergroup, const int shard, const std::string& checkpoint);
ListCheckpointResponse ListCheckpoint(const std::string& project, const std::string& logstore, const std::string& consumergroup);
HeartbeatResponse ConsumerGroupHeartbeat(const std::string& project, const std::string& logstore, const std::string& consumergroup, const std::string& consumer, const std::vector<uint32_t>& shards);
/** Put data to LOG service. Unsuccessful opertaion will cause an LOGException.
* @param project The project name
* @param logstore The logstore of the data.
* @param topic The topic of the data.
* @param logGroup Log data.
* @return request_id.
*/
PostLogStoreLogsResponse PostLogStoreLogs(const std::string& project , const std::string& logstore, const std::string& topic, const std::vector<LogItem>& logGroup,const std::string& hashKey="");
/** Put data to LOG service. Unsuccessful opertaion will cause an LOGException.
* @param project The project name
* @param logGroup Log data.
* @return request_id.
*/
PostLogStoreLogsResponse PostLogStoreLogs(const std::string& project, const std::string& logstore, const pb::LogGroup& logGroup,const std::string& hashKey="");
/** Get data from LOG service. Unsuccessful opertaion will cause an LOGException.
* @param project The project name
* @param logstore The logstore of the data.
* @param topic The topic of the data.
* @param beginTime Start time for the search.
* @param endTime End time for the search.
* @param reverseFlag Direction of results returned, false:time sequence, true:reversed time sequence.
* @param lines Lines of results returned, if the number of all results is litter than the parameter, actual lines of results will be returned.
* @param offset Start offset of all results, return result will be [offset, offset+lines) of them.
* @param query Filter string to filter the results.
* @return The data specified by the parameters.
*/
GetLogStoreLogsResponse GetLogStoreLogs(const std::string& project, const std::string& logstore, const std::string& topic, time_t beginTime, time_t endTime, bool reverseFlag=false, int32_t lines=10, int32_t offset=0, const std::string& query="");
/** Get the data meta info for the query. It is only used for data indexed in sls.
* @param project The project name
* Unsuccessful opertaion will cause an LOGException
* @param logstore The logstore of the data.
* @param topic The topic of the data.
* @param beginTime Start time for the search.
* @param endTime End time for the search.
* @param query Filter string to filter the results.
* @return The data meta by the parameters.
*/
GetLogStoreHistogramResponse GetLogStoreHistogram(const std::string& project, const std::string& logstore, const std::string& topic, time_t beginTime, time_t endTime, const std::string& query="");
Response CreateLogStore(const std::string& project, const LogStore& logStore);
Response UpdateLogStore(const std::string& project, const LogStore& logStore);
Response DeleteLogStore(const std::string& project, const std::string& logStore);
GetLogStoreResponse GetLogStore(const std::string& project, const std::string& logStore);
/** List all categories of the user. Unsuccessful opertaion will cause an LOGException.
* @param project The project name
* @param offset The list offset
* @param size The list size
* @return All categories of the user.
*/
ListLogStoresResponse ListLogStores(const std::string& project, const std::string& logstorename = "", const int32_t& offset = -1, const int32_t& size = -1);
/** List all topics in a logstore. Unsuccessful opertaion will cause an LOGException.
* @param project The project name
* @param logstore The logstore of the topic.
* @return All topics in a logstore.
*/
ListLogStoreTopicsResponse ListLogStoreTopics(const std::string& project, const std::string& logstore, const int64_t& line, const std::string& nextToken = "");
/** get cursor in a logstore. Unsuccessful opertaion will cause an LOGException.
* @param project The project name
* @param logstore.
* @param shardId
* @param fromTime, unixtimestamp
* @return cursor.
*/
GetCursorResponse GetCursor(const std::string& project, const std::string& logstore, uint32_t shardId, uint32_t fromTime);
/** get cursor in a logstore. Unsuccessful opertaion will cause an LOGException.
* @param project The project name
* @param logstore.
* @param shardId
* @param cursorMode, "begin" or "end"
* @return cursor.
*/
GetCursorResponse GetCursor(const std::string& project, const std::string& logstore, uint32_t shardId, LOG_Cursor_Mode cursorMode);
/** List all shards in a logstore. Unsuccessful opertaion will cause an LOGException.
* @param project The project name
* @param logstore The logstore of the topic.
* @return All shards in a logstore.
*/
ListShardsResponse ListShards(const std::string& project, const std::string& logstore);
/** split a shard in logstore. Unsuccessful opertaion will cause an LOGException.
* @param project The project name
* @param logstore The logstore of the topic.
* @param shardId the shard id
* @param middleHash mmiddle hash
* @return originShard and result two splited shard
*/
SplitShardResponse SplitShard(const std::string& project,const std::string& logstore,const int shardId,const std::string& middleHash);
/** merge two shard in to one shard in logstore. Unsuccessful opertaion will cause an LOGException.
* @param project The project name
* @param logstore The logstore of the topic.
* @param shardId the left shard id
* @return origin tow shards and result merged shard
*/
MergeShardsResponse MergeShard(const std::string& project,const std::string& logstore,const int shardId);
/**
* delete a readonly shard
* @param project The project name
* @param logstore The logstore of the topic.
* @param shardId the left shard id
* @return request_id.
*/
Response DeleteShard( const std::string& project,const std::string& logstore,const int shardId);
/** get package list in a shard. Unsuccessful opertaion will cause an LOGException.
* @param project The project name
* @param logstore
* @param shardId
* @param count, max loggroup count returned
* @param cursor/endCursor, [cursor, endCursor), endCursor set "" means no endCursor defined
* @return All package list in a shard
*/
GetBatchLogResponse GetBatchLog(const std::string& project, const std::string& logstore, uint32_t shardId, int count, const std::string& cursor, const std::string& endCursor = "");
/** get package list in a shard. Unsuccessful opertaion will cause an LOGException.
* @param project The project name
* @param logstore
* @param shardId
* @param count, max loggroup count returned
* @param cursor/endCursor, [cursor, endCursor), endCursor set "" means no endCursor defined
* @return All pb package list in a shard
*/
GetPbBatchLogResponse GetPbBatchLog(const std::string& project, const std::string& logstore, uint32_t shardId, int count, const std::string& cursor, const std::string& endCursor = "");
Response CreateIndex(const std::string& project, const std::string& logStore, const Index& index);
Response UpdateIndex(const std::string& project, const std::string& logStore, const Index& index);
GetIndexResponse GetIndex(const std::string& project, const std::string& logStore);
Response DeleteIndex(const std::string& project, const std::string& logStore);
Response CreateConfig(const std::string& project, const Config& config);
Response UpdateConfig(const std::string& project, const Config& config);
Response DeleteConfig(const std::string& project, const std::string& config);
GetConfigResponse GetConfig(const std::string& project, const std::string& config);
ListConfigsResponse ListConfigs(const std::string& project, const std::string& configName = "", const int32_t& offset = -1, const int32_t& size = -1);
Response CreateMachineGroup(const std::string& project, const MachineGroup& machineGroup);
Response UpdateMachineGroup(const std::string& project, const MachineGroup& machineGroup);
Response DeleteMachineGroup(const std::string& project, const std::string& machineGroup);
GetMachineGroupResponse GetMachineGroup(const std::string& project, const std::string& machineGroup);
ListMachineGroupsResponse ListMachineGroups(const std::string& project, const int32_t& offset = -1, const int32_t& size = -1);
Response ApplyConfigToMachineGroup(const std::string& project, const std::string& machineGroup, const std::string& config);
Response RemoveConfigFromMachineGroup(const std::string& project, const std::string& machineGroup, const std::string& config);
GetAppliedConfigsResponse GetAppliedConfigs(const std::string& project, const std::string& machineGroup);
LogStoreSqlResponse ExecuteLogStoreSql(const std::string &project, const std::string &logstore, time_t beginTime, time_t endTime, const std::string &query, bool powerSql = false);
ProjectSqlResponse ExecuteProjectSql(const std::string &project, const std::string &query, bool powerSql);
CreateSqlInstanceResponse CreateSqlInstance(const std::string &project, int cu);
UpdateSqlInstanceResponse UpdateSqlInstance(const std::string &project, int cu);
ListSqlInstanceResponse ListSqlInstance(const std::string &project);
void SetUserAgent(const std::string& userAgent) {mUserAgent = userAgent;}
void SetAccessKey(const std::string& accessKey);
std::string GetAccessKey();
void SetAccessKeyId(const std::string& accessKeyId);
std::string GetAccessKeyId();
void SetSlsHost(const std::string& slsHost);
std::string GetSlsHost();
std::string GetHostFieldSuffix();
const std::string& GetSecurityToken()
{
return mSecurityToken;
}
void SetSecurityToken(const std::string& securityToken)
{
mSecurityToken = securityToken;
}
void RemoveSecurityToken()
{
SetSecurityToken("");
}
void SetMaxSendSpeed(const int64_t speed){ mMaxSendSpeedInBytePerSec = speed; }
protected:
std::string mSlsHost;
std::string mAccessKeyId;
std::string mAccessKey;
std::string mSecurityToken;
std::string mSource;
bool mCompressFlag;
int32_t mTimeout;
std::string mUserAgent;
std::string mHostFieldSuffix;
bool mIsHostRawIp;
std::mutex mMutex;
//HttpMessage (*mLOGSend)(const std::string& url, const std::map<std::string, std::string>& header, const std::string& body, const LOG_Request_Mode requestMode, int32_t timeout);
std::string (*mGetDateString)();
void (*mLOGSend)(const std::string& httpMethod, const std::string& host, const int32_t port, const std::string& url, const std::string& queryString, const std::map<std::string, std::string>& header, const std::string& body, const int32_t timeout, HttpMessage& httpMessage, const int64_t maxspeed);
int64_t mMaxSendSpeedInBytePerSec;
void SendRequest(const std::string& project, const std::string& httpMethod, const std::string& url, const std::string& body, const std::map<std::string, std::string>& parameterList, std::map<std::string, std::string>& header, HttpMessage& httpMessage);
private:
GetCursorResponse GetCursor(const std::string& project, const std::string& logstore, uint32_t shardId, const std::string& paraKey, const std::string& paraValue);
PullDataResponse GetLogGroupList(const std::string& project, const std::string& logstore, uint32_t shardId, int count, const std::string& cursor, const std::string& endCursor, pb::LogGroupList& logGroupList);
void SetCommonHeader(std::map<std::string, std::string>& httpHeader, int32_t contentLength, const std::string& project="");
void SetCommonParameter(std::map<std::string, std::string>& parameterList);
std::string GetHost(const std::string& project);
};
}
#endif