core/unittest/flusher/FlusherSLSUnittest.cpp (1,791 lines of code) (raw):

// Copyright 2023 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include <memory> #include <random> #include <string> #include "json/json.h" #include "TagConstants.h" #include "app_config/AppConfig.h" #include "collection_pipeline/CollectionPipeline.h" #include "collection_pipeline/CollectionPipelineContext.h" #include "collection_pipeline/queue/ExactlyOnceQueueManager.h" #include "collection_pipeline/queue/ProcessQueueManager.h" #include "collection_pipeline/queue/QueueKeyManager.h" #include "collection_pipeline/queue/SLSSenderQueueItem.h" #include "collection_pipeline/queue/SenderQueueManager.h" #include "common/JsonUtil.h" #include "common/LogtailCommonFlags.h" #include "common/compression/CompressorFactory.h" #include "common/http/Constant.h" #include "plugin/flusher/sls/FlusherSLS.h" #include "plugin/flusher/sls/PackIdManager.h" #include "plugin/flusher/sls/SLSClientManager.h" #include "plugin/flusher/sls/SLSConstant.h" #include "unittest/Unittest.h" #ifdef __ENTERPRISE__ #include "config/provider/EnterpriseConfigProvider.h" #include "unittest/flusher/SLSNetworkRequestMock.h" #endif DECLARE_FLAG_INT32(batch_send_interval); DECLARE_FLAG_INT32(merge_log_count_limit); DECLARE_FLAG_INT32(batch_send_metric_size); DECLARE_FLAG_INT32(max_send_log_group_size); DECLARE_FLAG_DOUBLE(sls_serialize_size_expansion_ratio); DECLARE_FLAG_BOOL(send_prefer_real_ip); DECLARE_FLAG_STRING(default_access_key_id); DECLARE_FLAG_STRING(default_access_key); using namespace std; namespace logtail { class FlusherSLSUnittest : public testing::Test { public: void OnSuccessfulInit(); void OnFailedInit(); void OnPipelineUpdate(); void TestBuildRequest(); void TestSend(); void TestFlush(); void TestFlushAll(); void TestAddPackId(); void OnGoPipelineSend(); protected: static void SetUpTestCase() { #ifdef __ENTERPRISE__ EnterpriseSLSClientManager::GetInstance()->mDoProbeNetwork = ProbeNetworkMock::DoProbeNetwork; EnterpriseSLSClientManager::GetInstance()->mGetEndpointRealIp = GetRealIpMock::GetEndpointRealIp; EnterpriseSLSClientManager::GetInstance()->mGetAccessKeyFromSLS = GetAccessKeyMock::DoGetAccessKey; #endif } void SetUp() override { ctx.SetConfigName("test_config"); ctx.SetPipeline(pipeline); } void TearDown() override { PackIdManager::GetInstance()->mPackIdSeq.clear(); QueueKeyManager::GetInstance()->Clear(); SenderQueueManager::GetInstance()->Clear(); ExactlyOnceQueueManager::GetInstance()->Clear(); #ifdef __ENTERPRISE__ EnterpriseSLSClientManager::GetInstance()->Clear(); #endif } private: CollectionPipeline pipeline; CollectionPipelineContext ctx; }; void FlusherSLSUnittest::OnSuccessfulInit() { unique_ptr<FlusherSLS> flusher; Json::Value configJson, optionalGoPipelineJson, optionalGoPipeline; string configStr, optionalGoPipelineStr, errorMsg; // only mandatory param configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); #ifndef __ENTERPRISE__ configJson["Endpoint"] = "test_region.log.aliyuncs.com"; #endif flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_TRUE(optionalGoPipeline.isNull()); APSARA_TEST_EQUAL("test_project", flusher->mProject); APSARA_TEST_EQUAL("test_logstore", flusher->mLogstore); APSARA_TEST_EQUAL(STRING_FLAG(default_region_name), flusher->mRegion); #ifndef __ENTERPRISE__ APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", flusher->mEndpoint); #endif APSARA_TEST_EQUAL("", flusher->mAliuid); APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType); APSARA_TEST_TRUE(flusher->mShardHashKeys.empty()); APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(merge_log_count_limit)), flusher->mBatcher.GetEventFlushStrategy().GetMinCnt()); APSARA_TEST_EQUAL( static_cast<uint32_t>(INT32_FLAG(max_send_log_group_size) / DOUBLE_FLAG(sls_serialize_size_expansion_ratio)), flusher->mBatcher.GetEventFlushStrategy().GetMaxSizeBytes()); APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)), flusher->mBatcher.GetEventFlushStrategy().GetMinSizeBytes()); uint32_t timeout = static_cast<uint32_t>(INT32_FLAG(batch_send_interval)) / 2; APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(batch_send_interval)) - timeout, flusher->mBatcher.GetEventFlushStrategy().GetTimeoutSecs()); APSARA_TEST_TRUE(flusher->mBatcher.GetGroupFlushStrategy().has_value()); APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)), flusher->mBatcher.GetGroupFlushStrategy()->GetMinSizeBytes()); APSARA_TEST_EQUAL(timeout, flusher->mBatcher.GetGroupFlushStrategy()->GetTimeoutSecs()); APSARA_TEST_TRUE(flusher->mGroupSerializer); APSARA_TEST_TRUE(flusher->mGroupListSerializer); APSARA_TEST_EQUAL(CompressType::LZ4, flusher->mCompressor->GetCompressType()); APSARA_TEST_EQUAL(QueueKeyManager::GetInstance()->GetKey("test_config-flusher_sls-test_project#test_logstore"), flusher->GetQueueKey()); auto que = SenderQueueManager::GetInstance()->GetQueue(flusher->GetQueueKey()); APSARA_TEST_NOT_EQUAL(nullptr, que); APSARA_TEST_FALSE(que->GetRateLimiter().has_value()); APSARA_TEST_EQUAL(3U, que->GetConcurrencyLimiters().size()); // valid optional param configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "Aliuid": "123456789", "TelemetryType": "logs", "ShardHashKeys": [ "__source__" ] } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); #ifndef __ENTERPRISE__ configJson["EndpointMode"] = "default"; #endif flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_EQUAL("test_region", flusher->mRegion); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("123456789", flusher->mAliuid); APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mEndpointMode); #else APSARA_TEST_EQUAL("", flusher->mAliuid); #endif APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", flusher->mEndpoint); APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType); APSARA_TEST_EQUAL(1U, flusher->mShardHashKeys.size()); APSARA_TEST_EQUAL("__source__", flusher->mShardHashKeys[0]); SenderQueueManager::GetInstance()->Clear(); // invalid optional param configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": true, "Aliuid": true, "TelemetryType": true, "ShardHashKeys": true } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); #ifdef __ENTERPRISE__ configJson["EndpointMode"] = true; configJson["Endpoint"] = true; #else configJson["Endpoint"] = "test_region.log.aliyuncs.com"; #endif flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_EQUAL(STRING_FLAG(default_region_name), flusher->mRegion); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mEndpointMode); APSARA_TEST_EQUAL("", flusher->mEndpoint); #endif APSARA_TEST_EQUAL("", flusher->mAliuid); APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType); APSARA_TEST_TRUE(flusher->mShardHashKeys.empty()); SenderQueueManager::GetInstance()->Clear(); #ifdef __ENTERPRISE__ // region EnterpriseConfigProvider::GetInstance()->mIsPrivateCloud = true; configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_EQUAL(STRING_FLAG(default_region_name), flusher->mRegion); EnterpriseConfigProvider::GetInstance()->mIsPrivateCloud = false; SenderQueueManager::GetInstance()->Clear(); #endif #ifdef __ENTERPRISE__ // EndpointMode && Endpoint EnterpriseSLSClientManager::GetInstance()->Clear(); // Endpoint ignored in accelerate mode configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "EndpointMode": "accelerate", "Endpoint": " test_region.log.aliyuncs.com " } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_EQUAL(EndpointMode::ACCELERATE, flusher->mEndpointMode); APSARA_TEST_EQUAL(EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap.end(), EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap.find("test_region")); APSARA_TEST_EQUAL(flusher->mProject, flusher->mCandidateHostsInfo->GetProject()); APSARA_TEST_EQUAL(flusher->mRegion, flusher->mCandidateHostsInfo->GetRegion()); APSARA_TEST_EQUAL(EndpointMode::ACCELERATE, flusher->mCandidateHostsInfo->GetMode()); SenderQueueManager::GetInstance()->Clear(); // region remote endpoints not existed configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "EndpointMode": "unknown", "Endpoint": " test_region.log.aliyuncs.com " } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mEndpointMode); auto& endpoints = EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap["test_region"].mRemoteEndpoints; APSARA_TEST_EQUAL(1U, endpoints.size()); APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", endpoints[0]); APSARA_TEST_EQUAL(flusher->mProject, flusher->mCandidateHostsInfo->GetProject()); APSARA_TEST_EQUAL(flusher->mRegion, flusher->mCandidateHostsInfo->GetRegion()); APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mCandidateHostsInfo->GetMode()); SenderQueueManager::GetInstance()->Clear(); // region remote endpoints existed configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "EndpointMode": "default", "Endpoint": " test_region-intranet.log.aliyuncs.com " } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mEndpointMode); endpoints = EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap["test_region"].mRemoteEndpoints; APSARA_TEST_EQUAL(2U, endpoints.size()); APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", endpoints[0]); APSARA_TEST_EQUAL("test_region-intranet.log.aliyuncs.com", endpoints[1]); APSARA_TEST_EQUAL(flusher->mProject, flusher->mCandidateHostsInfo->GetProject()); APSARA_TEST_EQUAL(flusher->mRegion, flusher->mCandidateHostsInfo->GetRegion()); APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mCandidateHostsInfo->GetMode()); SenderQueueManager::GetInstance()->Clear(); #endif #ifndef __ENTERPRISE__ // Endpoint configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": " test_region.log.aliyuncs.com " } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", flusher->mEndpoint); SenderQueueManager::GetInstance()->Clear(); #endif // TelemetryType configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "TelemetryType": "metrics" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_METRICS, flusher->mTelemetryType); SenderQueueManager::GetInstance()->Clear(); configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "TelemetryType": "unknown" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType); SenderQueueManager::GetInstance()->Clear(); // ShardHashKeys configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "ShardHashKeys": [ "__source__" ] } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); ctx.SetExactlyOnceFlag(true); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_TRUE(flusher->mShardHashKeys.empty()); ctx.SetExactlyOnceFlag(false); SenderQueueManager::GetInstance()->Clear(); // group batch && sender queue configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "ShardHashKeys": [ "__source__" ] } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value()); SenderQueueManager::GetInstance()->Clear(); configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); ctx.SetExactlyOnceFlag(true); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value()); APSARA_TEST_EQUAL(nullptr, SenderQueueManager::GetInstance()->GetQueue(flusher->GetQueueKey())); ctx.SetExactlyOnceFlag(false); SenderQueueManager::GetInstance()->Clear(); configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "TelemetryType": "metrics" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value()); SenderQueueManager::GetInstance()->Clear(); // apm std::vector<std::string> apmConfigStr = {R"( { "Type": "flusher_sls", "TelemetryType": "arms_traces", "Project": "test_project", "Region": "test_region", "Endpoint": "test_endpoint", "Match": { "Type": "tag", "Key": "data_type", "Value": "trace" } } )", R"( { "Type": "flusher_sls", "TelemetryType": "arms_metrics", "Project": "test_project", "Region": "test_region", "Endpoint": "test_endpoint", "Match": { "Type": "tag", "Key": "data_type", "Value": "metric" } } )", R"( { "Type": "flusher_sls", "TelemetryType": "arms_agentinfo", "Project": "test_project", "Region": "test_region", "Endpoint": "test_endpoint", "Match": { "Type": "tag", "Key": "data_type", "Value": "agent_info" } } )"}; std::vector<std::string> apmSubpath = {APM_TRACES_URL, APM_METRICS_URL, APM_AGENTINFOS_URL}; std::vector<sls_logs::SlsTelemetryType> apmTelemetryTypes = { sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_TRACES, sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_METRICS, sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_AGENTINFOS, }; for (size_t ii = 0; ii < apmConfigStr.size(); ii++) { auto& cfg = apmConfigStr[ii]; APSARA_TEST_TRUE(ParseJsonTable(cfg, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_EQUAL(flusher->mSubpath, apmSubpath[ii]); APSARA_TEST_EQUAL(flusher->mTelemetryType, apmTelemetryTypes[ii]); SenderQueueManager::GetInstance()->Clear(); } // go param ctx.SetIsFlushingThroughGoPipelineFlag(true); configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "cn-hangzhou", "Endpoint": "cn-hangzhou.log.aliyuncs.com", } )"; optionalGoPipelineStr = R"( { "flushers": [ { "type": "flusher_sls/4", "detail": {} } ] } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(optionalGoPipelineStr, optionalGoPipelineJson, errorMsg)); pipeline.mPluginID.store(4); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_TRUE(optionalGoPipelineJson == optionalGoPipeline); optionalGoPipeline.clear(); configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "EnableShardHash": false } )"; optionalGoPipelineStr = R"( { "flushers": [ { "type": "flusher_sls/4", "detail": { "EnableShardHash": false } } ] } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); APSARA_TEST_TRUE(ParseJsonTable(optionalGoPipelineStr, optionalGoPipelineJson, errorMsg)); pipeline.mPluginID.store(4); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); APSARA_TEST_EQUAL(optionalGoPipelineJson.toStyledString(), optionalGoPipeline.toStyledString()); ctx.SetIsFlushingThroughGoPipelineFlag(false); SenderQueueManager::GetInstance()->Clear(); } void FlusherSLSUnittest::OnFailedInit() { unique_ptr<FlusherSLS> flusher; Json::Value configJson, optionalGoPipeline; string configStr, errorMsg; // invalid Project configStr = R"( { "Type": "flusher_sls", "Logstore": "test_logstore", "Endpoint": "test_region.log.aliyuncs.com" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_FALSE(flusher->Init(configJson, optionalGoPipeline)); configStr = R"( { "Type": "flusher_sls", "Project": true, "Logstore": "test_logstore", "Endpoint": "test_region.log.aliyuncs.com" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_FALSE(flusher->Init(configJson, optionalGoPipeline)); // invalid Logstore configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Endpoint": "test_region.log.aliyuncs.com" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_FALSE(flusher->Init(configJson, optionalGoPipeline)); configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": true, "Endpoint": "test_region.log.aliyuncs.com" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_FALSE(flusher->Init(configJson, optionalGoPipeline)); #ifndef __ENTERPRISE__ // invalid Endpoint configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_FALSE(flusher->Init(configJson, optionalGoPipeline)); configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Endpoint": true } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); flusher.reset(new FlusherSLS()); flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_FALSE(flusher->Init(configJson, optionalGoPipeline)); #endif } void FlusherSLSUnittest::OnPipelineUpdate() { CollectionPipelineContext ctx1; ctx1.SetConfigName("test_config_1"); Json::Value configJson, optionalGoPipeline; FlusherSLS flusher1; flusher1.SetContext(ctx1); flusher1.SetMetricsRecordRef(FlusherSLS::sName, "1"); string configStr, errorMsg; configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); APSARA_TEST_TRUE(flusher1.Init(configJson, optionalGoPipeline)); APSARA_TEST_TRUE(flusher1.Start()); APSARA_TEST_EQUAL(1U, FlusherSLS::sProjectRefCntMap.size()); { CollectionPipelineContext ctx2; ctx2.SetConfigName("test_config_2"); FlusherSLS flusher2; flusher2.SetContext(ctx2); flusher2.SetMetricsRecordRef(FlusherSLS::sName, "1"); configStr = R"( { "Type": "flusher_sls", "Project": "test_project_2", "Logstore": "test_logstore_2", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "Aliuid": "123456789" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); APSARA_TEST_TRUE(flusher2.Init(configJson, optionalGoPipeline)); APSARA_TEST_TRUE(flusher1.Stop(false)); APSARA_TEST_TRUE(FlusherSLS::sProjectRefCntMap.empty()); APSARA_TEST_TRUE(SenderQueueManager::GetInstance()->IsQueueMarkedDeleted(flusher1.GetQueueKey())); APSARA_TEST_TRUE(flusher2.Start()); APSARA_TEST_EQUAL(1U, FlusherSLS::sProjectRefCntMap.size()); APSARA_TEST_TRUE(SenderQueueManager::GetInstance()->IsQueueMarkedDeleted(flusher1.GetQueueKey())); APSARA_TEST_FALSE(SenderQueueManager::GetInstance()->IsQueueMarkedDeleted(flusher2.GetQueueKey())); flusher2.Stop(true); flusher1.Start(); } { CollectionPipelineContext ctx2; ctx2.SetConfigName("test_config_1"); FlusherSLS flusher2; flusher2.SetContext(ctx2); flusher2.SetMetricsRecordRef(FlusherSLS::sName, "1"); configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "Aliuid": "123456789" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); APSARA_TEST_TRUE(flusher2.Init(configJson, optionalGoPipeline)); APSARA_TEST_TRUE(flusher1.Stop(false)); APSARA_TEST_TRUE(SenderQueueManager::GetInstance()->IsQueueMarkedDeleted(flusher1.GetQueueKey())); APSARA_TEST_TRUE(flusher2.Start()); APSARA_TEST_FALSE(SenderQueueManager::GetInstance()->IsQueueMarkedDeleted(flusher1.GetQueueKey())); APSARA_TEST_FALSE(SenderQueueManager::GetInstance()->IsQueueMarkedDeleted(flusher2.GetQueueKey())); flusher2.Stop(true); } } void FlusherSLSUnittest::TestBuildRequest() { #ifdef __ENTERPRISE__ EnterpriseSLSClientManager::GetInstance()->UpdateLocalRegionEndpointsAndHttpsInfo("test_region", {kAccelerationDataEndpoint}); EnterpriseSLSClientManager::GetInstance()->UpdateRemoteRegionEndpoints( "test_region", {"test_region-intranet.log.aliyuncs.com", "test_region.log.aliyuncs.com"}); EnterpriseSLSClientManager::GetInstance()->UpdateRemoteRegionEndpoints( "test_region-b", {"test_region-b-intranet.log.aliyuncs.com", "test_region-b.log.aliyuncs.com"}); #endif Json::Value configJson, optionalGoPipeline; string errorMsg; string configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region-b", "Aliuid": "1234567890", "Endpoint": "test_endpoint" } )"; APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); FlusherSLS flusher; flusher.SetContext(ctx); flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); APSARA_TEST_TRUE(flusher.Init(configJson, optionalGoPipeline)); string body = "hello, world!"; string bodyLenStr = to_string(body.size()); uint32_t rawSize = 100; string rawSizeStr = "100"; SLSSenderQueueItem item("hello, world!", rawSize, &flusher, flusher.GetQueueKey(), flusher.mLogstore); unique_ptr<HttpSinkRequest> req; bool keepItem = false; string errMsg; #ifdef __ENTERPRISE__ { // empty ak, first try APSARA_TEST_FALSE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); // empty ak, second try APSARA_TEST_FALSE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(nullptr, req); APSARA_TEST_TRUE(keepItem); } EnterpriseSLSClientManager::GetInstance()->SetAccessKey( "1234567890", SLSClientManager::AuthType::ANONYMOUS, "test_ak", "test_sk"); { // no available host, uninitialized APSARA_TEST_FALSE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(nullptr, req); APSARA_TEST_TRUE(keepItem); APSARA_TEST_EQUAL(static_cast<uint32_t>(AppConfig::GetInstance()->GetSendRequestConcurrency()), FlusherSLS::GetRegionConcurrencyLimiter(flusher.mRegion)->GetCurrentLimit()); } { // no available host, initialized flusher.mCandidateHostsInfo->SetInitialized(); APSARA_TEST_FALSE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(nullptr, req); APSARA_TEST_TRUE(keepItem); APSARA_TEST_EQUAL(static_cast<uint32_t>(AppConfig::GetInstance()->GetSendRequestConcurrency()), FlusherSLS::GetRegionConcurrencyLimiter(flusher.mRegion)->GetCurrentLimit()); } EnterpriseSLSClientManager::GetInstance()->UpdateHostLatency("test_project", EndpointMode::DEFAULT, "test_project.test_region-b.log.aliyuncs.com", chrono::milliseconds(100)); flusher.mCandidateHostsInfo->SelectBestHost(); #endif // log telemetry type { // normal SLSSenderQueueItem item("hello, world!", rawSize, &flusher, flusher.GetQueueKey(), flusher.mLogstore); APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); #ifdef __ENTERPRISE__ APSARA_TEST_FALSE(req->mHTTPSFlag); #else APSARA_TEST_TRUE(req->mHTTPSFlag); #endif APSARA_TEST_EQUAL("/logstores/test_logstore/shards/lb", req->mUrl); APSARA_TEST_EQUAL("", req->mQueryString); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(12U, req->mHeader.size()); #else APSARA_TEST_EQUAL(11U, req->mHeader.size()); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHeader[HOST]); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHeader[HOST]); #endif APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); APSARA_TEST_FALSE(req->mHeader[DATE].empty()); APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); APSARA_TEST_EQUAL(rawSizeStr, req->mHeader[X_LOG_BODYRAWSIZE]); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(MD5_SHA1_SALT_KEYPROVIDER, req->mHeader[X_LOG_KEYPROVIDER]); #endif APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); APSARA_TEST_EQUAL(body, req->mBody); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHost); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(80, req->mPort); #else APSARA_TEST_EQUAL(443, req->mPort); #endif APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)), req->mTimeout); APSARA_TEST_EQUAL(1U, req->mMaxTryCnt); APSARA_TEST_FALSE(req->mFollowRedirects); APSARA_TEST_EQUAL(&item, req->mItem); APSARA_TEST_FALSE(item.mRealIpFlag); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", item.mCurrentHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", item.mCurrentHost); #endif } // arms_traces telemetry type { flusher.mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES; flusher.mSubpath = APM_TRACES_URL; // normal SLSSenderQueueItem item("hello, world!", rawSize, &flusher, flusher.GetQueueKey(), flusher.mLogstore); APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); #ifdef __ENTERPRISE__ APSARA_TEST_FALSE(req->mHTTPSFlag); #else APSARA_TEST_TRUE(req->mHTTPSFlag); #endif APSARA_TEST_EQUAL(APM_TRACES_URL, req->mUrl); APSARA_TEST_EQUAL("", req->mQueryString); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(12U, req->mHeader.size()); #else APSARA_TEST_EQUAL(11U, req->mHeader.size()); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHeader[HOST]); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHeader[HOST]); #endif APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); APSARA_TEST_FALSE(req->mHeader[DATE].empty()); APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); APSARA_TEST_EQUAL(rawSizeStr, req->mHeader[X_LOG_BODYRAWSIZE]); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(MD5_SHA1_SALT_KEYPROVIDER, req->mHeader[X_LOG_KEYPROVIDER]); #endif APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); APSARA_TEST_EQUAL(body, req->mBody); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHost); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(80, req->mPort); #else APSARA_TEST_EQUAL(443, req->mPort); #endif APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)), req->mTimeout); APSARA_TEST_EQUAL(1U, req->mMaxTryCnt); APSARA_TEST_FALSE(req->mFollowRedirects); APSARA_TEST_EQUAL(&item, req->mItem); APSARA_TEST_FALSE(item.mRealIpFlag); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", item.mCurrentHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", item.mCurrentHost); #endif } // arms_metrics telemetry type { flusher.mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS; flusher.mSubpath = APM_METRICS_URL; // normal SLSSenderQueueItem item("hello, world!", rawSize, &flusher, flusher.GetQueueKey(), flusher.mLogstore); APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); #ifdef __ENTERPRISE__ APSARA_TEST_FALSE(req->mHTTPSFlag); #else APSARA_TEST_TRUE(req->mHTTPSFlag); #endif APSARA_TEST_EQUAL(APM_METRICS_URL, req->mUrl); APSARA_TEST_EQUAL("", req->mQueryString); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(12U, req->mHeader.size()); #else APSARA_TEST_EQUAL(11U, req->mHeader.size()); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHeader[HOST]); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHeader[HOST]); #endif APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); APSARA_TEST_FALSE(req->mHeader[DATE].empty()); APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); APSARA_TEST_EQUAL(rawSizeStr, req->mHeader[X_LOG_BODYRAWSIZE]); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(MD5_SHA1_SALT_KEYPROVIDER, req->mHeader[X_LOG_KEYPROVIDER]); #endif APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); APSARA_TEST_EQUAL(body, req->mBody); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHost); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(80, req->mPort); #else APSARA_TEST_EQUAL(443, req->mPort); #endif APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)), req->mTimeout); APSARA_TEST_EQUAL(1U, req->mMaxTryCnt); APSARA_TEST_FALSE(req->mFollowRedirects); APSARA_TEST_EQUAL(&item, req->mItem); APSARA_TEST_FALSE(item.mRealIpFlag); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", item.mCurrentHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", item.mCurrentHost); #endif } // arms_agentinfo telemetry type { flusher.mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS; flusher.mSubpath = APM_AGENTINFOS_URL; // normal SLSSenderQueueItem item("hello, world!", rawSize, &flusher, flusher.GetQueueKey(), flusher.mLogstore); APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); #ifdef __ENTERPRISE__ APSARA_TEST_FALSE(req->mHTTPSFlag); #else APSARA_TEST_TRUE(req->mHTTPSFlag); #endif APSARA_TEST_EQUAL(APM_AGENTINFOS_URL, req->mUrl); APSARA_TEST_EQUAL("", req->mQueryString); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(12U, req->mHeader.size()); #else APSARA_TEST_EQUAL(11U, req->mHeader.size()); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHeader[HOST]); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHeader[HOST]); #endif APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); APSARA_TEST_FALSE(req->mHeader[DATE].empty()); APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); APSARA_TEST_EQUAL(rawSizeStr, req->mHeader[X_LOG_BODYRAWSIZE]); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(MD5_SHA1_SALT_KEYPROVIDER, req->mHeader[X_LOG_KEYPROVIDER]); #endif APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); APSARA_TEST_EQUAL(body, req->mBody); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHost); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(80, req->mPort); #else APSARA_TEST_EQUAL(443, req->mPort); #endif APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)), req->mTimeout); APSARA_TEST_EQUAL(1U, req->mMaxTryCnt); APSARA_TEST_FALSE(req->mFollowRedirects); APSARA_TEST_EQUAL(&item, req->mItem); APSARA_TEST_FALSE(item.mRealIpFlag); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", item.mCurrentHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", item.mCurrentHost); #endif } flusher.mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_LOGS; { // event group list SLSSenderQueueItem item("hello, world!", rawSize, &flusher, flusher.GetQueueKey(), flusher.mLogstore, RawDataType::EVENT_GROUP_LIST); APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); #ifdef __ENTERPRISE__ APSARA_TEST_FALSE(req->mHTTPSFlag); #else APSARA_TEST_TRUE(req->mHTTPSFlag); #endif APSARA_TEST_EQUAL("/logstores/test_logstore/shards/lb", req->mUrl); APSARA_TEST_EQUAL("", req->mQueryString); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(13U, req->mHeader.size()); #else APSARA_TEST_EQUAL(12U, req->mHeader.size()); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHeader[HOST]); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHeader[HOST]); #endif APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); APSARA_TEST_FALSE(req->mHeader[DATE].empty()); APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[X_LOG_BODYRAWSIZE]); APSARA_TEST_EQUAL(LOG_MODE_BATCH_GROUP, req->mHeader[X_LOG_MODE]); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(MD5_SHA1_SALT_KEYPROVIDER, req->mHeader[X_LOG_KEYPROVIDER]); #endif APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); APSARA_TEST_EQUAL(body, req->mBody); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHost); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(80, req->mPort); #else APSARA_TEST_EQUAL(443, req->mPort); #endif APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)), req->mTimeout); APSARA_TEST_EQUAL(1U, req->mMaxTryCnt); APSARA_TEST_FALSE(req->mFollowRedirects); APSARA_TEST_EQUAL(&item, req->mItem); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", item.mCurrentHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", item.mCurrentHost); #endif } { // shard hash SLSSenderQueueItem item("hello, world!", rawSize, &flusher, flusher.GetQueueKey(), flusher.mLogstore, RawDataType::EVENT_GROUP, "hash_key"); APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); #ifdef __ENTERPRISE__ APSARA_TEST_FALSE(req->mHTTPSFlag); #else APSARA_TEST_TRUE(req->mHTTPSFlag); #endif APSARA_TEST_EQUAL("/logstores/test_logstore/shards/route", req->mUrl); map<string, string> params{{"key", "hash_key"}}; APSARA_TEST_EQUAL(GetQueryString(params), req->mQueryString); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(12U, req->mHeader.size()); #else APSARA_TEST_EQUAL(11U, req->mHeader.size()); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHeader[HOST]); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHeader[HOST]); #endif APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); APSARA_TEST_FALSE(req->mHeader[DATE].empty()); APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); APSARA_TEST_EQUAL(rawSizeStr, req->mHeader[X_LOG_BODYRAWSIZE]); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(MD5_SHA1_SALT_KEYPROVIDER, req->mHeader[X_LOG_KEYPROVIDER]); #endif APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); APSARA_TEST_EQUAL(body, req->mBody); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHost); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(80, req->mPort); #else APSARA_TEST_EQUAL(443, req->mPort); #endif APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)), req->mTimeout); APSARA_TEST_EQUAL(1U, req->mMaxTryCnt); APSARA_TEST_FALSE(req->mFollowRedirects); APSARA_TEST_EQUAL(&item, req->mItem); APSARA_TEST_FALSE(item.mRealIpFlag); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", item.mCurrentHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", item.mCurrentHost); #endif } { // exactly once auto cpt = make_shared<RangeCheckpoint>(); cpt->index = 0; cpt->data.set_hash_key("hash_key_0"); cpt->data.set_sequence_id(1); SLSSenderQueueItem item("hello, world!", rawSize, &flusher, flusher.GetQueueKey(), flusher.mLogstore, RawDataType::EVENT_GROUP, "hash_key_0", std::move(cpt)); APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); #ifdef __ENTERPRISE__ APSARA_TEST_FALSE(req->mHTTPSFlag); #else APSARA_TEST_TRUE(req->mHTTPSFlag); #endif APSARA_TEST_EQUAL("/logstores/test_logstore/shards/route", req->mUrl); map<string, string> params{{"key", "hash_key_0"}, {"seqid", "1"}}; APSARA_TEST_EQUAL(GetQueryString(params), req->mQueryString); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(12U, req->mHeader.size()); #else APSARA_TEST_EQUAL(11U, req->mHeader.size()); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHeader[HOST]); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHeader[HOST]); #endif APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); APSARA_TEST_FALSE(req->mHeader[DATE].empty()); APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); APSARA_TEST_EQUAL(rawSizeStr, req->mHeader[X_LOG_BODYRAWSIZE]); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(MD5_SHA1_SALT_KEYPROVIDER, req->mHeader[X_LOG_KEYPROVIDER]); #endif APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); APSARA_TEST_EQUAL(body, req->mBody); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHost); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(80, req->mPort); #else APSARA_TEST_EQUAL(443, req->mPort); #endif APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)), req->mTimeout); APSARA_TEST_EQUAL(1U, req->mMaxTryCnt); APSARA_TEST_FALSE(req->mFollowRedirects); APSARA_TEST_EQUAL(&item, req->mItem); APSARA_TEST_FALSE(item.mRealIpFlag); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", item.mCurrentHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", item.mCurrentHost); #endif } // metric telemtery type flusher.mTelemetryType = sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_METRICS; { SLSSenderQueueItem item("hello, world!", rawSize, &flusher, flusher.GetQueueKey(), flusher.mLogstore); APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); #ifdef __ENTERPRISE__ APSARA_TEST_FALSE(req->mHTTPSFlag); #else APSARA_TEST_TRUE(req->mHTTPSFlag); #endif APSARA_TEST_EQUAL("/prometheus/test_project/test_logstore/api/v1/write", req->mUrl); APSARA_TEST_EQUAL("", req->mQueryString); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(12U, req->mHeader.size()); #else APSARA_TEST_EQUAL(11U, req->mHeader.size()); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHeader[HOST]); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHeader[HOST]); #endif APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); APSARA_TEST_FALSE(req->mHeader[DATE].empty()); APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); APSARA_TEST_EQUAL(rawSizeStr, req->mHeader[X_LOG_BODYRAWSIZE]); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(MD5_SHA1_SALT_KEYPROVIDER, req->mHeader[X_LOG_KEYPROVIDER]); #endif APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); APSARA_TEST_EQUAL(body, req->mBody); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", req->mHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", req->mHost); #endif #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL(80, req->mPort); #else APSARA_TEST_EQUAL(443, req->mPort); #endif APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)), req->mTimeout); APSARA_TEST_EQUAL(1U, req->mMaxTryCnt); APSARA_TEST_FALSE(req->mFollowRedirects); APSARA_TEST_EQUAL(&item, req->mItem); APSARA_TEST_FALSE(item.mRealIpFlag); #ifdef __ENTERPRISE__ APSARA_TEST_EQUAL("test_project.test_region-b.log.aliyuncs.com", item.mCurrentHost); #else APSARA_TEST_EQUAL("test_project.test_endpoint", item.mCurrentHost); #endif } flusher.mTelemetryType = sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS; #ifdef __ENTERPRISE__ { // region mode changed EnterpriseSLSClientManager::GetInstance()->CopyLocalRegionEndpointsAndHttpsInfoIfNotExisted("test_region", "test_region-b"); auto old = flusher.mCandidateHostsInfo.get(); APSARA_TEST_FALSE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_NOT_EQUAL(old, flusher.mCandidateHostsInfo.get()); EnterpriseSLSClientManager::GetInstance()->UpdateHostLatency("test_project", EndpointMode::ACCELERATE, "test_project." + kAccelerationDataEndpoint, chrono::milliseconds(10)); flusher.mCandidateHostsInfo->SelectBestHost(); APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL("test_project." + kAccelerationDataEndpoint, req->mHost); } // real ip BOOL_FLAG(send_prefer_real_ip) = true; { // ip not empty EnterpriseSLSClientManager::GetInstance()->SetRealIp("test_region-b", "192.168.0.1"); SLSSenderQueueItem item("hello, world!", rawSize, &flusher, flusher.GetQueueKey(), flusher.mLogstore); APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL(HTTP_POST, req->mMethod); APSARA_TEST_FALSE(req->mHTTPSFlag); APSARA_TEST_EQUAL("/logstores/test_logstore/shards/lb", req->mUrl); APSARA_TEST_EQUAL("", req->mQueryString); APSARA_TEST_EQUAL(12U, req->mHeader.size()); APSARA_TEST_EQUAL("test_project.192.168.0.1", req->mHeader[HOST]); APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); APSARA_TEST_FALSE(req->mHeader[DATE].empty()); APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); APSARA_TEST_EQUAL(rawSizeStr, req->mHeader[X_LOG_BODYRAWSIZE]); APSARA_TEST_EQUAL(MD5_SHA1_SALT_KEYPROVIDER, req->mHeader[X_LOG_KEYPROVIDER]); APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); APSARA_TEST_EQUAL(body, req->mBody); APSARA_TEST_EQUAL("192.168.0.1", req->mHost); APSARA_TEST_EQUAL(80, req->mPort); APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)), req->mTimeout); APSARA_TEST_EQUAL(1U, req->mMaxTryCnt); APSARA_TEST_FALSE(req->mFollowRedirects); APSARA_TEST_EQUAL(&item, req->mItem); APSARA_TEST_TRUE(item.mRealIpFlag); APSARA_TEST_EQUAL("192.168.0.1", item.mCurrentHost); } { // ip empty EnterpriseSLSClientManager::GetInstance()->SetRealIp("test_region-b", ""); SLSSenderQueueItem item("hello, world!", rawSize, &flusher, flusher.GetQueueKey(), flusher.mLogstore); APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL("test_project." + kAccelerationDataEndpoint, req->mHeader[HOST]); APSARA_TEST_EQUAL(SLSClientManager::GetInstance()->GetUserAgent(), req->mHeader[USER_AGENT]); APSARA_TEST_FALSE(req->mHeader[DATE].empty()); APSARA_TEST_EQUAL(TYPE_LOG_PROTOBUF, req->mHeader[CONTENT_TYPE]); APSARA_TEST_EQUAL(bodyLenStr, req->mHeader[CONTENT_LENGTH]); APSARA_TEST_EQUAL(CalcMD5(req->mBody), req->mHeader[CONTENT_MD5]); APSARA_TEST_EQUAL(LOG_API_VERSION, req->mHeader[X_LOG_APIVERSION]); APSARA_TEST_EQUAL(HMAC_SHA1, req->mHeader[X_LOG_SIGNATUREMETHOD]); APSARA_TEST_EQUAL("lz4", req->mHeader[X_LOG_COMPRESSTYPE]); APSARA_TEST_EQUAL(rawSizeStr, req->mHeader[X_LOG_BODYRAWSIZE]); APSARA_TEST_EQUAL(MD5_SHA1_SALT_KEYPROVIDER, req->mHeader[X_LOG_KEYPROVIDER]); APSARA_TEST_FALSE(req->mHeader[AUTHORIZATION].empty()); APSARA_TEST_EQUAL(body, req->mBody); APSARA_TEST_EQUAL("test_project." + kAccelerationDataEndpoint, req->mHost); APSARA_TEST_EQUAL(80, req->mPort); APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)), req->mTimeout); APSARA_TEST_EQUAL(1U, req->mMaxTryCnt); APSARA_TEST_FALSE(req->mFollowRedirects); APSARA_TEST_EQUAL(&item, req->mItem); APSARA_TEST_FALSE(item.mRealIpFlag); APSARA_TEST_EQUAL("test_project." + kAccelerationDataEndpoint, item.mCurrentHost); } { // ip empty, and region mode changed auto& endpoints = EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap["test_region-b"]; endpoints.mMode = EndpointMode::CUSTOM; endpoints.mLocalEndpoints = {"custom.endpoint"}; auto old = flusher.mCandidateHostsInfo.get(); APSARA_TEST_FALSE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_NOT_EQUAL(old, flusher.mCandidateHostsInfo.get()); EnterpriseSLSClientManager::GetInstance()->UpdateHostLatency( "test_project", EndpointMode::CUSTOM, "test_project.custom.endpoint", chrono::milliseconds(10)); flusher.mCandidateHostsInfo->SelectBestHost(); APSARA_TEST_TRUE(flusher.BuildRequest(&item, req, &keepItem, &errMsg)); APSARA_TEST_EQUAL("test_project.custom.endpoint", req->mHost); } BOOL_FLAG(send_prefer_real_ip) = false; #endif } void FlusherSLSUnittest::TestSend() { { // exactly once enabled // create flusher Json::Value configJson, optionalGoPipeline; string configStr, errorMsg; configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "Aliuid": "123456789" } )"; ParseJsonTable(configStr, configJson, errorMsg); FlusherSLS flusher; CollectionPipelineContext ctx; ctx.SetConfigName("test_config"); ctx.SetExactlyOnceFlag(true); flusher.SetContext(ctx); flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); flusher.Init(configJson, optionalGoPipeline); // create exactly once queue vector<RangeCheckpointPtr> checkpoints; for (size_t i = 0; i < 2; ++i) { auto cpt = make_shared<RangeCheckpoint>(); cpt->index = i; cpt->data.set_hash_key("hash_key_" + ToString(i)); cpt->data.set_sequence_id(0); checkpoints.emplace_back(cpt); } QueueKey eooKey = QueueKeyManager::GetInstance()->GetKey("eoo"); ExactlyOnceQueueManager::GetInstance()->CreateOrUpdateQueue( eooKey, ProcessQueueManager::sMaxPriority, flusher.GetContext(), checkpoints); { // replayed group PipelineEventGroup group(make_shared<SourceBuffer>()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid"); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); auto cpt = make_shared<RangeCheckpoint>(); cpt->index = 1; cpt->fbKey = eooKey; cpt->data.set_hash_key("hash_key_1"); cpt->data.set_sequence_id(0); cpt->data.set_read_offset(0); cpt->data.set_read_length(10); group.SetExactlyOnceCheckpoint(cpt); auto e = group.AddLogEvent(); e->SetTimestamp(1234567890); e->SetContent(string("content_key"), string("content_value")); APSARA_TEST_TRUE(flusher.Send(std::move(group))); vector<SenderQueueItem*> res; ExactlyOnceQueueManager::GetInstance()->GetAvailableSenderQueueItems(res, 80); APSARA_TEST_EQUAL(1U, res.size()); auto item = static_cast<SLSSenderQueueItem*>(res[0]); APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP, item->mType); APSARA_TEST_FALSE(item->mBufferOrNot); APSARA_TEST_EQUAL(&flusher, item->mFlusher); APSARA_TEST_EQUAL(eooKey, item->mQueueKey); APSARA_TEST_EQUAL("hash_key_1", item->mShardHashKey); APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore); APSARA_TEST_EQUAL(cpt, item->mExactlyOnceCheckpoint); auto compressor = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4); string output, errorMsg; output.resize(item->mRawSize); APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(output)); APSARA_TEST_EQUAL("topic", logGroup.topic()); APSARA_TEST_EQUAL("uuid", logGroup.machineuuid()); APSARA_TEST_EQUAL("172.0.0.1", logGroup.source()); APSARA_TEST_EQUAL(1, logGroup.logtags_size()); APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key()); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size()); APSARA_TEST_EQUAL("content_key", logGroup.logs(0).contents(0).key()); APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value()); ExactlyOnceQueueManager::GetInstance()->RemoveSenderQueueItem(eooKey, item); } { // non-replay group flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(1); PipelineEventGroup group(make_shared<SourceBuffer>()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid"); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); auto cpt = make_shared<RangeCheckpoint>(); cpt->fbKey = eooKey; cpt->data.set_read_offset(0); cpt->data.set_read_length(10); group.SetExactlyOnceCheckpoint(cpt); auto e = group.AddLogEvent(); e->SetTimestamp(1234567890); e->SetContent(string("content_key"), string("content_value")); APSARA_TEST_TRUE(flusher.Send(std::move(group))); vector<SenderQueueItem*> res; ExactlyOnceQueueManager::GetInstance()->GetAvailableSenderQueueItems(res, 80); APSARA_TEST_EQUAL(1U, res.size()); auto item = static_cast<SLSSenderQueueItem*>(res[0]); APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP, item->mType); APSARA_TEST_FALSE(item->mBufferOrNot); APSARA_TEST_EQUAL(&flusher, item->mFlusher); APSARA_TEST_EQUAL(eooKey, item->mQueueKey); APSARA_TEST_EQUAL("hash_key_0", item->mShardHashKey); APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore); APSARA_TEST_EQUAL(checkpoints[0], item->mExactlyOnceCheckpoint); auto compressor = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4); string output, errorMsg; output.resize(item->mRawSize); APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(output)); APSARA_TEST_EQUAL("topic", logGroup.topic()); APSARA_TEST_EQUAL("uuid", logGroup.machineuuid()); APSARA_TEST_EQUAL("172.0.0.1", logGroup.source()); APSARA_TEST_EQUAL(1, logGroup.logtags_size()); APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key()); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size()); APSARA_TEST_EQUAL("content_key", logGroup.logs(0).contents(0).key()); APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value()); ExactlyOnceQueueManager::GetInstance()->RemoveSenderQueueItem(eooKey, item); } } { // normal flusher, without group batch Json::Value configJson, optionalGoPipeline; string configStr, errorMsg; configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "Aliuid": "123456789", "ShardHashKeys": [ "tag_key" ] } )"; ParseJsonTable(configStr, configJson, errorMsg); FlusherSLS flusher; flusher.SetContext(ctx); flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); flusher.Init(configJson, optionalGoPipeline); { // empty group PipelineEventGroup group(make_shared<SourceBuffer>()); APSARA_TEST_TRUE(flusher.Send(std::move(group))); vector<SenderQueueItem*> res; SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); APSARA_TEST_TRUE(res.empty()); } { // group flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(1); PipelineEventGroup group(make_shared<SourceBuffer>()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid"); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); group.SetTag(string("tag_key"), string("tag_value")); auto e = group.AddLogEvent(); e->SetTimestamp(1234567890); e->SetContent(string("content_key"), string("content_value")); APSARA_TEST_TRUE(flusher.Send(std::move(group))); vector<SenderQueueItem*> res; SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); APSARA_TEST_EQUAL(1U, res.size()); auto item = static_cast<SLSSenderQueueItem*>(res[0]); APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP, item->mType); APSARA_TEST_TRUE(item->mBufferOrNot); APSARA_TEST_EQUAL(&flusher, item->mFlusher); APSARA_TEST_EQUAL(flusher.mQueueKey, item->mQueueKey); APSARA_TEST_EQUAL(CalcMD5("tag_value"), item->mShardHashKey); APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore); auto compressor = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4); string output, errorMsg; output.resize(item->mRawSize); APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(output)); APSARA_TEST_EQUAL("topic", logGroup.topic()); APSARA_TEST_EQUAL("uuid", logGroup.machineuuid()); APSARA_TEST_EQUAL("172.0.0.1", logGroup.source()); APSARA_TEST_EQUAL(2, logGroup.logtags_size()); APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key()); APSARA_TEST_EQUAL("tag_key", logGroup.logtags(1).key()); APSARA_TEST_EQUAL("tag_value", logGroup.logtags(1).value()); APSARA_TEST_EQUAL(1, logGroup.logs_size()); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size()); APSARA_TEST_EQUAL("content_key", logGroup.logs(0).contents(0).key()); APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value()); SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item); flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(4000); } { // oversized group INT32_FLAG(max_send_log_group_size) = 1; flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(1); PipelineEventGroup group(make_shared<SourceBuffer>()); auto e = group.AddLogEvent(); e->SetTimestamp(1234567890); e->SetContent(string("content_key"), string("content_value")); APSARA_TEST_FALSE(flusher.Send(std::move(group))); INT32_FLAG(max_send_log_group_size) = 10 * 1024 * 1024; flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(4000); } } { // normal flusher, with group batch Json::Value configJson, optionalGoPipeline; string configStr, errorMsg; configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "Aliuid": "123456789" } )"; ParseJsonTable(configStr, configJson, errorMsg); FlusherSLS flusher; flusher.SetContext(ctx); flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); flusher.Init(configJson, optionalGoPipeline); PipelineEventGroup group(make_shared<SourceBuffer>()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid"); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); { auto e = group.AddLogEvent(); e->SetTimestamp(1234567890); e->SetContent(string("content_key"), string("content_value")); } { auto e = group.AddLogEvent(); e->SetTimestamp(1234567990); e->SetContent(string("content_key"), string("content_value")); } flusher.mBatcher.GetGroupFlushStrategy()->SetMinSizeBytes(group.DataSize()); // flush the above two events from group item by the following event { auto e = group.AddLogEvent(); e->SetTimestamp(1234568990); e->SetContent(string("content_key"), string("content_value")); } APSARA_TEST_TRUE(flusher.Send(std::move(group))); vector<SenderQueueItem*> res; SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); APSARA_TEST_EQUAL(1U, res.size()); auto item = static_cast<SLSSenderQueueItem*>(res[0]); APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP_LIST, item->mType); APSARA_TEST_TRUE(item->mBufferOrNot); APSARA_TEST_EQUAL(&flusher, item->mFlusher); APSARA_TEST_EQUAL(flusher.mQueueKey, item->mQueueKey); APSARA_TEST_EQUAL("", item->mShardHashKey); APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore); auto compressor = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4); sls_logs::SlsLogPackageList packageList; APSARA_TEST_TRUE(packageList.ParseFromString(item->mData)); APSARA_TEST_EQUAL(2, packageList.packages_size()); uint32_t rawSize = 0; for (size_t i = 0; i < 2; ++i) { APSARA_TEST_EQUAL(sls_logs::SlsCompressType::SLS_CMP_LZ4, packageList.packages(i).compress_type()); string output, errorMsg; rawSize += packageList.packages(i).uncompress_size(); output.resize(packageList.packages(i).uncompress_size()); APSARA_TEST_TRUE(compressor->UnCompress(packageList.packages(i).data(), output, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(output)); APSARA_TEST_EQUAL("topic", logGroup.topic()); APSARA_TEST_EQUAL("uuid", logGroup.machineuuid()); APSARA_TEST_EQUAL("172.0.0.1", logGroup.source()); APSARA_TEST_EQUAL(1, logGroup.logtags_size()); APSARA_TEST_EQUAL("__pack_id__", logGroup.logtags(0).key()); APSARA_TEST_EQUAL(1, logGroup.logs_size()); if (i == 0) { APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); } else { APSARA_TEST_EQUAL(1234567990U, logGroup.logs(0).time()); } APSARA_TEST_EQUAL(1, logGroup.logs(0).contents_size()); APSARA_TEST_EQUAL("content_key", logGroup.logs(0).contents(0).key()); APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value()); } APSARA_TEST_EQUAL(rawSize, item->mRawSize); SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item); flusher.FlushAll(); res.clear(); SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); for (auto& tmp : res) { SenderQueueManager::GetInstance()->RemoveItem(tmp->mQueueKey, tmp); } flusher.mBatcher.GetGroupFlushStrategy()->SetMinSizeBytes(256 * 1024); } } void FlusherSLSUnittest::TestFlush() { Json::Value configJson, optionalGoPipeline; string configStr, errorMsg; configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "Aliuid": "123456789" } )"; ParseJsonTable(configStr, configJson, errorMsg); FlusherSLS flusher; flusher.SetContext(ctx); flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); flusher.Init(configJson, optionalGoPipeline); PipelineEventGroup group(make_shared<SourceBuffer>()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid"); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); { auto e = group.AddLogEvent(); e->SetTimestamp(1234567890); e->SetContent(string("content_key"), string("content_value")); } size_t batchKey = group.GetTagsHash(); flusher.Send(std::move(group)); flusher.Flush(batchKey); vector<SenderQueueItem*> res; SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); APSARA_TEST_EQUAL(0U, res.size()); flusher.Flush(0); SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); APSARA_TEST_EQUAL(1U, res.size()); } void FlusherSLSUnittest::TestFlushAll() { Json::Value configJson, optionalGoPipeline; string configStr, errorMsg; configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "Aliuid": "123456789" } )"; ParseJsonTable(configStr, configJson, errorMsg); FlusherSLS flusher; flusher.SetContext(ctx); flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); flusher.Init(configJson, optionalGoPipeline); PipelineEventGroup group(make_shared<SourceBuffer>()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); group.SetTag(LOG_RESERVED_KEY_SOURCE, "172.0.0.1"); group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "uuid"); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); { auto e = group.AddLogEvent(); e->SetTimestamp(1234567890); e->SetContent(string("content_key"), string("content_value")); } flusher.Send(std::move(group)); flusher.FlushAll(); vector<SenderQueueItem*> res; SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); APSARA_TEST_EQUAL(1U, res.size()); } void FlusherSLSUnittest::TestAddPackId() { FlusherSLS flusher; flusher.mProject = "test_project"; flusher.mLogstore = "test_logstore"; BatchedEvents batch; batch.mPackIdPrefix = "source-id"; batch.mSourceBuffers.emplace_back(make_shared<SourceBuffer>()); flusher.AddPackId(batch); APSARA_TEST_STREQ("34451096883514E2-0", batch.mTags.mInner["__pack_id__"].data()); } void FlusherSLSUnittest::OnGoPipelineSend() { { Json::Value configJson, optionalGoPipeline; string configStr, errorMsg; configStr = R"( { "Type": "flusher_sls", "Project": "test_project", "Logstore": "test_logstore", "Region": "test_region", "Endpoint": "test_region.log.aliyuncs.com", "Aliuid": "123456789" } )"; ParseJsonTable(configStr, configJson, errorMsg); FlusherSLS flusher; flusher.SetContext(ctx); flusher.SetMetricsRecordRef(FlusherSLS::sName, "1"); flusher.Init(configJson, optionalGoPipeline); { APSARA_TEST_TRUE(flusher.Send("content", "shardhash_key", "other_logstore")); vector<SenderQueueItem*> res; SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); APSARA_TEST_EQUAL(1U, res.size()); auto item = static_cast<SLSSenderQueueItem*>(res[0]); APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP, item->mType); APSARA_TEST_TRUE(item->mBufferOrNot); APSARA_TEST_EQUAL(&flusher, item->mFlusher); APSARA_TEST_EQUAL(flusher.mQueueKey, item->mQueueKey); APSARA_TEST_EQUAL("shardhash_key", item->mShardHashKey); APSARA_TEST_EQUAL("other_logstore", item->mLogstore); auto compressor = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4); string output; output.resize(item->mRawSize); APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg)); APSARA_TEST_EQUAL("content", output); } { APSARA_TEST_TRUE(flusher.Send("content", "shardhash_key", "")); vector<SenderQueueItem*> res; SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); APSARA_TEST_EQUAL(1U, res.size()); auto item = static_cast<SLSSenderQueueItem*>(res[0]); APSARA_TEST_EQUAL("test_logstore", item->mLogstore); } } { // go profile flusher has no context FlusherSLS flusher; flusher.mProject = "test_project"; flusher.mLogstore = "test_logstore"; flusher.mCompressor = CompressorFactory::GetInstance()->Create( Json::Value(), CollectionPipelineContext(), "flusher_sls", "1", CompressType::LZ4); APSARA_TEST_TRUE(flusher.Send("content", "")); auto key = QueueKeyManager::GetInstance()->GetKey("test_project-test_logstore"); APSARA_TEST_NOT_EQUAL(nullptr, SenderQueueManager::GetInstance()->GetQueue(key)); vector<SenderQueueItem*> res; SenderQueueManager::GetInstance()->GetAvailableItems(res, 80); APSARA_TEST_EQUAL(1U, res.size()); auto item = static_cast<SLSSenderQueueItem*>(res[0]); APSARA_TEST_EQUAL(RawDataType::EVENT_GROUP, item->mType); APSARA_TEST_TRUE(item->mBufferOrNot); APSARA_TEST_EQUAL(&flusher, item->mFlusher); APSARA_TEST_EQUAL(key, item->mQueueKey); APSARA_TEST_EQUAL("test_logstore", item->mLogstore); auto compressor = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4); string output; output.resize(item->mRawSize); string errorMsg; APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg)); APSARA_TEST_EQUAL("content", output); } } UNIT_TEST_CASE(FlusherSLSUnittest, OnSuccessfulInit) UNIT_TEST_CASE(FlusherSLSUnittest, OnFailedInit) UNIT_TEST_CASE(FlusherSLSUnittest, OnPipelineUpdate) UNIT_TEST_CASE(FlusherSLSUnittest, TestBuildRequest) UNIT_TEST_CASE(FlusherSLSUnittest, TestSend) UNIT_TEST_CASE(FlusherSLSUnittest, TestFlush) UNIT_TEST_CASE(FlusherSLSUnittest, TestFlushAll) UNIT_TEST_CASE(FlusherSLSUnittest, TestAddPackId) UNIT_TEST_CASE(FlusherSLSUnittest, OnGoPipelineSend) } // namespace logtail UNIT_TEST_MAIN