void FlusherSLSUnittest::OnSuccessfulInit()

in core/unittest/flusher/FlusherSLSUnittest.cpp [98:587]


void FlusherSLSUnittest::OnSuccessfulInit() {
    unique_ptr<FlusherSLS> flusher;
    Json::Value configJson, optionalGoPipelineJson, optionalGoPipeline;
    string configStr, optionalGoPipelineStr, errorMsg;

    // only mandatory param
    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore"
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
#ifndef __ENTERPRISE__
    configJson["Endpoint"] = "test_region.log.aliyuncs.com";
#endif
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_TRUE(optionalGoPipeline.isNull());
    APSARA_TEST_EQUAL("test_project", flusher->mProject);
    APSARA_TEST_EQUAL("test_logstore", flusher->mLogstore);
    APSARA_TEST_EQUAL(STRING_FLAG(default_region_name), flusher->mRegion);
#ifndef __ENTERPRISE__
    APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", flusher->mEndpoint);
#endif
    APSARA_TEST_EQUAL("", flusher->mAliuid);
    APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType);
    APSARA_TEST_TRUE(flusher->mShardHashKeys.empty());
    APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(merge_log_count_limit)),
                      flusher->mBatcher.GetEventFlushStrategy().GetMinCnt());
    APSARA_TEST_EQUAL(
        static_cast<uint32_t>(INT32_FLAG(max_send_log_group_size) / DOUBLE_FLAG(sls_serialize_size_expansion_ratio)),
        flusher->mBatcher.GetEventFlushStrategy().GetMaxSizeBytes());
    APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)),
                      flusher->mBatcher.GetEventFlushStrategy().GetMinSizeBytes());
    uint32_t timeout = static_cast<uint32_t>(INT32_FLAG(batch_send_interval)) / 2;
    APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(batch_send_interval)) - timeout,
                      flusher->mBatcher.GetEventFlushStrategy().GetTimeoutSecs());
    APSARA_TEST_TRUE(flusher->mBatcher.GetGroupFlushStrategy().has_value());
    APSARA_TEST_EQUAL(static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)),
                      flusher->mBatcher.GetGroupFlushStrategy()->GetMinSizeBytes());
    APSARA_TEST_EQUAL(timeout, flusher->mBatcher.GetGroupFlushStrategy()->GetTimeoutSecs());
    APSARA_TEST_TRUE(flusher->mGroupSerializer);
    APSARA_TEST_TRUE(flusher->mGroupListSerializer);
    APSARA_TEST_EQUAL(CompressType::LZ4, flusher->mCompressor->GetCompressType());
    APSARA_TEST_EQUAL(QueueKeyManager::GetInstance()->GetKey("test_config-flusher_sls-test_project#test_logstore"),
                      flusher->GetQueueKey());
    auto que = SenderQueueManager::GetInstance()->GetQueue(flusher->GetQueueKey());
    APSARA_TEST_NOT_EQUAL(nullptr, que);
    APSARA_TEST_FALSE(que->GetRateLimiter().has_value());
    APSARA_TEST_EQUAL(3U, que->GetConcurrencyLimiters().size());

    // valid optional param
    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "Endpoint": "test_region.log.aliyuncs.com",
            "Aliuid": "123456789",
            "TelemetryType": "logs",
            "ShardHashKeys": [
                "__source__"
            ]
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
#ifndef __ENTERPRISE__
    configJson["EndpointMode"] = "default";
#endif
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_EQUAL("test_region", flusher->mRegion);
#ifdef __ENTERPRISE__
    APSARA_TEST_EQUAL("123456789", flusher->mAliuid);
    APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mEndpointMode);
#else
    APSARA_TEST_EQUAL("", flusher->mAliuid);
#endif
    APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", flusher->mEndpoint);
    APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType);
    APSARA_TEST_EQUAL(1U, flusher->mShardHashKeys.size());
    APSARA_TEST_EQUAL("__source__", flusher->mShardHashKeys[0]);
    SenderQueueManager::GetInstance()->Clear();

    // invalid optional param
    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": true,
            "Aliuid": true,
            "TelemetryType": true,
            "ShardHashKeys": true
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
#ifdef __ENTERPRISE__
    configJson["EndpointMode"] = true;
    configJson["Endpoint"] = true;
#else
    configJson["Endpoint"] = "test_region.log.aliyuncs.com";
#endif
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_EQUAL(STRING_FLAG(default_region_name), flusher->mRegion);
#ifdef __ENTERPRISE__
    APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mEndpointMode);
    APSARA_TEST_EQUAL("", flusher->mEndpoint);
#endif
    APSARA_TEST_EQUAL("", flusher->mAliuid);
    APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType);
    APSARA_TEST_TRUE(flusher->mShardHashKeys.empty());
    SenderQueueManager::GetInstance()->Clear();

#ifdef __ENTERPRISE__
    // region
    EnterpriseConfigProvider::GetInstance()->mIsPrivateCloud = true;
    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "Endpoint": "test_region.log.aliyuncs.com"
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_EQUAL(STRING_FLAG(default_region_name), flusher->mRegion);
    EnterpriseConfigProvider::GetInstance()->mIsPrivateCloud = false;
    SenderQueueManager::GetInstance()->Clear();
#endif

#ifdef __ENTERPRISE__
    // EndpointMode && Endpoint
    EnterpriseSLSClientManager::GetInstance()->Clear();
    // Endpoint ignored in accelerate mode
    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "EndpointMode": "accelerate",
            "Endpoint": "  test_region.log.aliyuncs.com   "
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_EQUAL(EndpointMode::ACCELERATE, flusher->mEndpointMode);
    APSARA_TEST_EQUAL(EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap.end(),
                      EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap.find("test_region"));
    APSARA_TEST_EQUAL(flusher->mProject, flusher->mCandidateHostsInfo->GetProject());
    APSARA_TEST_EQUAL(flusher->mRegion, flusher->mCandidateHostsInfo->GetRegion());
    APSARA_TEST_EQUAL(EndpointMode::ACCELERATE, flusher->mCandidateHostsInfo->GetMode());
    SenderQueueManager::GetInstance()->Clear();

    // region remote endpoints not existed
    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "EndpointMode": "unknown",
            "Endpoint": "  test_region.log.aliyuncs.com   "
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mEndpointMode);
    auto& endpoints
        = EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap["test_region"].mRemoteEndpoints;
    APSARA_TEST_EQUAL(1U, endpoints.size());
    APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", endpoints[0]);
    APSARA_TEST_EQUAL(flusher->mProject, flusher->mCandidateHostsInfo->GetProject());
    APSARA_TEST_EQUAL(flusher->mRegion, flusher->mCandidateHostsInfo->GetRegion());
    APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mCandidateHostsInfo->GetMode());
    SenderQueueManager::GetInstance()->Clear();

    // region remote endpoints existed
    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "EndpointMode": "default",
            "Endpoint": "  test_region-intranet.log.aliyuncs.com   "
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mEndpointMode);
    endpoints = EnterpriseSLSClientManager::GetInstance()->mRegionCandidateEndpointsMap["test_region"].mRemoteEndpoints;
    APSARA_TEST_EQUAL(2U, endpoints.size());
    APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", endpoints[0]);
    APSARA_TEST_EQUAL("test_region-intranet.log.aliyuncs.com", endpoints[1]);
    APSARA_TEST_EQUAL(flusher->mProject, flusher->mCandidateHostsInfo->GetProject());
    APSARA_TEST_EQUAL(flusher->mRegion, flusher->mCandidateHostsInfo->GetRegion());
    APSARA_TEST_EQUAL(EndpointMode::DEFAULT, flusher->mCandidateHostsInfo->GetMode());
    SenderQueueManager::GetInstance()->Clear();
#endif

#ifndef __ENTERPRISE__
    // Endpoint
    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "Endpoint": "  test_region.log.aliyuncs.com   "
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_EQUAL("test_region.log.aliyuncs.com", flusher->mEndpoint);
    SenderQueueManager::GetInstance()->Clear();
#endif

    // TelemetryType
    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "Endpoint": "test_region.log.aliyuncs.com",
            "TelemetryType": "metrics"
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_METRICS, flusher->mTelemetryType);
    SenderQueueManager::GetInstance()->Clear();

    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "Endpoint": "test_region.log.aliyuncs.com",
            "TelemetryType": "unknown"
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType);
    SenderQueueManager::GetInstance()->Clear();

    // ShardHashKeys
    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "Endpoint": "test_region.log.aliyuncs.com",
            "ShardHashKeys": [
                "__source__"
            ]
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    flusher.reset(new FlusherSLS());
    ctx.SetExactlyOnceFlag(true);
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_TRUE(flusher->mShardHashKeys.empty());
    ctx.SetExactlyOnceFlag(false);
    SenderQueueManager::GetInstance()->Clear();

    // group batch && sender queue
    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "Endpoint": "test_region.log.aliyuncs.com",
            "ShardHashKeys": [
                "__source__"
            ]
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value());
    SenderQueueManager::GetInstance()->Clear();

    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "Endpoint": "test_region.log.aliyuncs.com"
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    flusher.reset(new FlusherSLS());
    ctx.SetExactlyOnceFlag(true);
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value());
    APSARA_TEST_EQUAL(nullptr, SenderQueueManager::GetInstance()->GetQueue(flusher->GetQueueKey()));
    ctx.SetExactlyOnceFlag(false);
    SenderQueueManager::GetInstance()->Clear();

    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "Endpoint": "test_region.log.aliyuncs.com",
            "TelemetryType": "metrics"
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value());
    SenderQueueManager::GetInstance()->Clear();

    // apm
    std::vector<std::string> apmConfigStr = {R"(
        {
            "Type": "flusher_sls",
            "TelemetryType": "arms_traces",
            "Project": "test_project",
            "Region": "test_region",
            "Endpoint": "test_endpoint",
            "Match": {
                "Type": "tag",
                "Key": "data_type",
                "Value": "trace"
            }
        }
    )",
                                             R"(
        {
            "Type": "flusher_sls",
            "TelemetryType": "arms_metrics",
            "Project": "test_project",
            "Region": "test_region",
            "Endpoint": "test_endpoint",
            "Match": {
                "Type": "tag",
                "Key": "data_type",
                "Value": "metric"
            }
        }
    )",
                                             R"(
        {
            "Type": "flusher_sls",
            "TelemetryType": "arms_agentinfo",
            "Project": "test_project",
            "Region": "test_region",
            "Endpoint": "test_endpoint",
            "Match": {
                "Type": "tag",
                "Key": "data_type",
                "Value": "agent_info"
            }
        }
    )"};
    std::vector<std::string> apmSubpath = {APM_TRACES_URL, APM_METRICS_URL, APM_AGENTINFOS_URL};
    std::vector<sls_logs::SlsTelemetryType> apmTelemetryTypes = {
        sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_TRACES,
        sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_METRICS,
        sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_APM_AGENTINFOS,
    };
    for (size_t ii = 0; ii < apmConfigStr.size(); ii++) {
        auto& cfg = apmConfigStr[ii];
        APSARA_TEST_TRUE(ParseJsonTable(cfg, configJson, errorMsg));
        flusher.reset(new FlusherSLS());
        flusher->SetContext(ctx);
        flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
        APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
        APSARA_TEST_EQUAL(flusher->mSubpath, apmSubpath[ii]);
        APSARA_TEST_EQUAL(flusher->mTelemetryType, apmTelemetryTypes[ii]);
        SenderQueueManager::GetInstance()->Clear();
    }

    // go param
    ctx.SetIsFlushingThroughGoPipelineFlag(true);
    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "cn-hangzhou",
            "Endpoint": "cn-hangzhou.log.aliyuncs.com",
        }
    )";
    optionalGoPipelineStr = R"(
        {
            "flushers": [
                {
                    "type": "flusher_sls/4",
                    "detail": {}
                }
            ]
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(optionalGoPipelineStr, optionalGoPipelineJson, errorMsg));
    pipeline.mPluginID.store(4);
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_TRUE(optionalGoPipelineJson == optionalGoPipeline);
    optionalGoPipeline.clear();

    configStr = R"(
        {
            "Type": "flusher_sls",
            "Project": "test_project",
            "Logstore": "test_logstore",
            "Region": "test_region",
            "Endpoint": "test_region.log.aliyuncs.com",
            "EnableShardHash": false
        }
    )";
    optionalGoPipelineStr = R"(
        {
            "flushers": [
                {
                    "type": "flusher_sls/4",
                    "detail": {
                        "EnableShardHash": false
                    }
                }
            ]
        }
    )";
    APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
    APSARA_TEST_TRUE(ParseJsonTable(optionalGoPipelineStr, optionalGoPipelineJson, errorMsg));
    pipeline.mPluginID.store(4);
    flusher.reset(new FlusherSLS());
    flusher->SetContext(ctx);
    flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
    APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
    APSARA_TEST_EQUAL(optionalGoPipelineJson.toStyledString(), optionalGoPipeline.toStyledString());
    ctx.SetIsFlushingThroughGoPipelineFlag(false);
    SenderQueueManager::GetInstance()->Clear();
}