in core/unittest/flusher/FlusherSLSUnittest.cpp [98:587]
void FlusherSLSUnittest::OnSuccessfulInit() {
unique_ptr<FlusherSLS> flusher;
Json::Value configJson, optionalGoPipelineJson, optionalGoPipeline;
string configStr, optionalGoPipelineStr, errorMsg;
// only mandatory param
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore"
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
#ifndef __ENTERPRISE__
configJson["Endpoint"] = "test_region.log.aliyuncs.com";
#endif
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_TRUE(optionalGoPipeline.isNull());
APSARA_TEST_EQUAL("test_project", flusher->mProject);
APSARA_TEST_EQUAL("test_logstore", flusher->mLogstore);
APSARA_TEST_EQUAL(STRING_FLAG(default_region_name), flusher->mRegion);
#ifndef __ENTERPRISE__
APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", flusher->mEndpoint);
#endif
APSARA_TEST_EQUAL("", flusher->mAliuid);
APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType);
APSARA_TEST_TRUE(flusher->mShardHashKeys.empty());
APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(merge_log_count_limit)),
flusher->mBatcher.GetEventFlushStrategy().GetMinCnt());
APSARA_TEST_EQUAL(
static_cast<uint32_t>(INT32_FLAG(max_send_log_group_size) / DOUBLE_FLAG(sls_serialize_size_expansion_ratio)),
flusher->mBatcher.GetEventFlushStrategy().GetMaxSizeBytes());
APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)),
flusher->mBatcher.GetEventFlushStrategy().GetMinSizeBytes());
uint32_t timeout = static_cast<uint32_t>(INT32_FLAG(batch_send_interval)) / 2;
APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(batch_send_interval)) - timeout,
flusher->mBatcher.GetEventFlushStrategy().GetTimeoutSecs());
APSARA_TEST_TRUE(flusher->mBatcher.GetGroupFlushStrategy().has_value());
APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)),
flusher->mBatcher.GetGroupFlushStrategy()->GetMinSizeBytes());
APSARA_TEST_EQUAL(timeout, flusher->mBatcher.GetGroupFlushStrategy()->GetTimeoutSecs());
APSARA_TEST_TRUE(flusher->mGroupSerializer);
APSARA_TEST_TRUE(flusher->mGroupListSerializer);
APSARA_TEST_EQUAL(CompressType::LZ4, flusher->mCompressor->GetCompressType());
APSARA_TEST_EQUAL(QueueKeyManager::GetInstance()->GetKey("test_config-flusher_sls-test_project#test_logstore"),
flusher->GetQueueKey());
auto que = SenderQueueManager::GetInstance()->GetQueue(flusher->GetQueueKey());
APSARA_TEST_NOT_EQUAL(nullptr, que);
APSARA_TEST_FALSE(que->GetRateLimiter().has_value());
APSARA_TEST_EQUAL(3U, que->GetConcurrencyLimiters().size());
// valid optional param
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"Aliuid": "123456789",
"TelemetryType": "logs",
"ShardHashKeys": [
"__source__"
]
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
#ifndef __ENTERPRISE__
configJson["EndpointMode"] = "default";
#endif
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_EQUAL("test_region", flusher->mRegion);
#ifdef __ENTERPRISE__
APSARA_TEST_EQUAL("123456789", flusher->mAliuid);
APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mEndpointMode);
#else
APSARA_TEST_EQUAL("", flusher->mAliuid);
#endif
APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", flusher->mEndpoint);
APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType);
APSARA_TEST_EQUAL(1U, flusher->mShardHashKeys.size());
APSARA_TEST_EQUAL("__source__", flusher->mShardHashKeys[0]);
SenderQueueManager::GetInstance()->Clear();
// invalid optional param
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": true,
"Aliuid": true,
"TelemetryType": true,
"ShardHashKeys": true
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
#ifdef __ENTERPRISE__
configJson["EndpointMode"] = true;
configJson["Endpoint"] = true;
#else
configJson["Endpoint"] = "test_region.log.aliyuncs.com";
#endif
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_EQUAL(STRING_FLAG(default_region_name), flusher->mRegion);
#ifdef __ENTERPRISE__
APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mEndpointMode);
APSARA_TEST_EQUAL("", flusher->mEndpoint);
#endif
APSARA_TEST_EQUAL("", flusher->mAliuid);
APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType);
APSARA_TEST_TRUE(flusher->mShardHashKeys.empty());
SenderQueueManager::GetInstance()->Clear();
#ifdef __ENTERPRISE__
// region
EnterpriseConfigProvider::GetInstance()->mIsPrivateCloud = true;
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com"
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_EQUAL(STRING_FLAG(default_region_name), flusher->mRegion);
EnterpriseConfigProvider::GetInstance()->mIsPrivateCloud = false;
SenderQueueManager::GetInstance()->Clear();
#endif
#ifdef __ENTERPRISE__
// EndpointMode && Endpoint
EnterpriseSLSClientManager::GetInstance()->Clear();
// Endpoint ignored in accelerate mode
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"EndpointMode": "accelerate",
"Endpoint": " test_region.log.aliyuncs.com "
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_EQUAL(EndpointMode::ACCELERATE, flusher->mEndpointMode);
APSARA_TEST_EQUAL(EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap.end(),
EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap.find("test_region"));
APSARA_TEST_EQUAL(flusher->mProject, flusher->mCandidateHostsInfo->GetProject());
APSARA_TEST_EQUAL(flusher->mRegion, flusher->mCandidateHostsInfo->GetRegion());
APSARA_TEST_EQUAL(EndpointMode::ACCELERATE, flusher->mCandidateHostsInfo->GetMode());
SenderQueueManager::GetInstance()->Clear();
// region remote endpoints not existed
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"EndpointMode": "unknown",
"Endpoint": " test_region.log.aliyuncs.com "
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mEndpointMode);
auto& endpoints
= EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap["test_region"].mRemoteEndpoints;
APSARA_TEST_EQUAL(1U, endpoints.size());
APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", endpoints[0]);
APSARA_TEST_EQUAL(flusher->mProject, flusher->mCandidateHostsInfo->GetProject());
APSARA_TEST_EQUAL(flusher->mRegion, flusher->mCandidateHostsInfo->GetRegion());
APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mCandidateHostsInfo->GetMode());
SenderQueueManager::GetInstance()->Clear();
// region remote endpoints existed
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"EndpointMode": "default",
"Endpoint": " test_region-intranet.log.aliyuncs.com "
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mEndpointMode);
endpoints = EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap["test_region"].mRemoteEndpoints;
APSARA_TEST_EQUAL(2U, endpoints.size());
APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", endpoints[0]);
APSARA_TEST_EQUAL("test_region-intranet.log.aliyuncs.com", endpoints[1]);
APSARA_TEST_EQUAL(flusher->mProject, flusher->mCandidateHostsInfo->GetProject());
APSARA_TEST_EQUAL(flusher->mRegion, flusher->mCandidateHostsInfo->GetRegion());
APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mCandidateHostsInfo->GetMode());
SenderQueueManager::GetInstance()->Clear();
#endif
#ifndef __ENTERPRISE__
// Endpoint
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": " test_region.log.aliyuncs.com "
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", flusher->mEndpoint);
SenderQueueManager::GetInstance()->Clear();
#endif
// TelemetryType
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"TelemetryType": "metrics"
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_METRICS, flusher->mTelemetryType);
SenderQueueManager::GetInstance()->Clear();
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"TelemetryType": "unknown"
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType);
SenderQueueManager::GetInstance()->Clear();
// ShardHashKeys
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"ShardHashKeys": [
"__source__"
]
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
ctx.SetExactlyOnceFlag(true);
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_TRUE(flusher->mShardHashKeys.empty());
ctx.SetExactlyOnceFlag(false);
SenderQueueManager::GetInstance()->Clear();
// group batch && sender queue
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"ShardHashKeys": [
"__source__"
]
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value());
SenderQueueManager::GetInstance()->Clear();
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com"
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
ctx.SetExactlyOnceFlag(true);
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value());
APSARA_TEST_EQUAL(nullptr, SenderQueueManager::GetInstance()->GetQueue(flusher->GetQueueKey()));
ctx.SetExactlyOnceFlag(false);
SenderQueueManager::GetInstance()->Clear();
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"TelemetryType": "metrics"
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value());
SenderQueueManager::GetInstance()->Clear();
// apm
std::vector<std::string> apmConfigStr = {R"(
{
"Type": "flusher_sls",
"TelemetryType": "arms_traces",
"Project": "test_project",
"Region": "test_region",
"Endpoint": "test_endpoint",
"Match": {
"Type": "tag",
"Key": "data_type",
"Value": "trace"
}
}
)",
R"(
{
"Type": "flusher_sls",
"TelemetryType": "arms_metrics",
"Project": "test_project",
"Region": "test_region",
"Endpoint": "test_endpoint",
"Match": {
"Type": "tag",
"Key": "data_type",
"Value": "metric"
}
}
)",
R"(
{
"Type": "flusher_sls",
"TelemetryType": "arms_agentinfo",
"Project": "test_project",
"Region": "test_region",
"Endpoint": "test_endpoint",
"Match": {
"Type": "tag",
"Key": "data_type",
"Value": "agent_info"
}
}
)"};
std::vector<std::string> apmSubpath = {APM_TRACES_URL, APM_METRICS_URL, APM_AGENTINFOS_URL};
std::vector<sls_logs::SlsTelemetryType> apmTelemetryTypes = {
sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_TRACES,
sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_METRICS,
sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_AGENTINFOS,
};
for (size_t ii = 0; ii < apmConfigStr.size(); ii++) {
auto& cfg = apmConfigStr[ii];
APSARA_TEST_TRUE(ParseJsonTable(cfg, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_EQUAL(flusher->mSubpath, apmSubpath[ii]);
APSARA_TEST_EQUAL(flusher->mTelemetryType, apmTelemetryTypes[ii]);
SenderQueueManager::GetInstance()->Clear();
}
// go param
ctx.SetIsFlushingThroughGoPipelineFlag(true);
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "cn-hangzhou",
"Endpoint": "cn-hangzhou.log.aliyuncs.com",
}
)";
optionalGoPipelineStr = R"(
{
"flushers": [
{
"type": "flusher_sls/4",
"detail": {}
}
]
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(optionalGoPipelineStr, optionalGoPipelineJson, errorMsg));
pipeline.mPluginID.store(4);
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_TRUE(optionalGoPipelineJson == optionalGoPipeline);
optionalGoPipeline.clear();
configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"EnableShardHash": false
}
)";
optionalGoPipelineStr = R"(
{
"flushers": [
{
"type": "flusher_sls/4",
"detail": {
"EnableShardHash": false
}
}
]
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
APSARA_TEST_TRUE(ParseJsonTable(optionalGoPipelineStr, optionalGoPipelineJson, errorMsg));
pipeline.mPluginID.store(4);
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_EQUAL(optionalGoPipelineJson.toStyledString(), optionalGoPipeline.toStyledString());
ctx.SetIsFlushingThroughGoPipelineFlag(false);
SenderQueueManager::GetInstance()->Clear();
}