bool FlusherSLS::Init()

in core/plugin/flusher/sls/FlusherSLS.cpp [285:568]


bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline) {
    string errorMsg;

    // Project
    if (!GetMandatoryStringParam(config, "Project", mProject, errorMsg)) {
        PARAM_ERROR_RETURN(mContext->GetLogger(),
                           mContext->GetAlarm(),
                           errorMsg,
                           sName,
                           mContext->GetConfigName(),
                           mContext->GetProjectName(),
                           mContext->GetLogstoreName(),
                           mContext->GetRegion());
    }

    // TelemetryType
    string telemetryType;
    if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) {
        PARAM_WARNING_DEFAULT(mContext->GetLogger(),
                              mContext->GetAlarm(),
                              errorMsg,
                              "logs",
                              sName,
                              mContext->GetConfigName(),
                              mContext->GetProjectName(),
                              mContext->GetLogstoreName(),
                              mContext->GetRegion());
    } else if (telemetryType == "metrics") {
        // TelemetryType set to metrics
        mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS
                                                               : sls_logs::SLS_TELEMETRY_TYPE_LOGS;
    } else if (telemetryType == "arms_agentinfo") {
        mSubpath = APM_AGENTINFOS_URL;
        mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS;
    } else if (telemetryType == "arms_metrics") {
        mSubpath = APM_METRICS_URL;
        mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS;
    } else if (telemetryType == "arms_traces") {
        mSubpath = APM_TRACES_URL;
        mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES;
    } else if (!telemetryType.empty() && telemetryType != "logs") {
        // TelemetryType invalid
        PARAM_WARNING_DEFAULT(mContext->GetLogger(),
                              mContext->GetAlarm(),
                              "string param TelemetryType is not valid",
                              "logs",
                              sName,
                              mContext->GetConfigName(),
                              mContext->GetProjectName(),
                              mContext->GetLogstoreName(),
                              mContext->GetRegion());
    }

    // Logstore
    if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) {
        // log and metric
        if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) {
            PARAM_ERROR_RETURN(mContext->GetLogger(),
                               mContext->GetAlarm(),
                               errorMsg,
                               sName,
                               mContext->GetConfigName(),
                               mContext->GetProjectName(),
                               mContext->GetLogstoreName(),
                               mContext->GetRegion());
        }
    }

    // Region
    if (
#ifdef __ENTERPRISE__
        !EnterpriseConfigProvider::GetInstance()->IsDataServerPrivateCloud() &&
#endif
        !GetOptionalStringParam(config, "Region", mRegion, errorMsg)) {
        PARAM_WARNING_DEFAULT(mContext->GetLogger(),
                              mContext->GetAlarm(),
                              errorMsg,
                              mRegion,
                              sName,
                              mContext->GetConfigName(),
                              mContext->GetProjectName(),
                              mContext->GetLogstoreName(),
                              mContext->GetRegion());
    }

#ifdef __ENTERPRISE__
    // Aliuid
    if (!GetOptionalStringParam(config, "Aliuid", mAliuid, errorMsg)) {
        PARAM_WARNING_IGNORE(mContext->GetLogger(),
                             mContext->GetAlarm(),
                             errorMsg,
                             sName,
                             mContext->GetConfigName(),
                             mContext->GetProjectName(),
                             mContext->GetLogstoreName(),
                             mContext->GetRegion());
    }

    // EndpointMode
    string endpointMode = "default";
    if (!GetOptionalStringParam(config, "EndpointMode", endpointMode, errorMsg)) {
        PARAM_WARNING_DEFAULT(mContext->GetLogger(),
                              mContext->GetAlarm(),
                              errorMsg,
                              endpointMode,
                              sName,
                              mContext->GetConfigName(),
                              mContext->GetProjectName(),
                              mContext->GetLogstoreName(),
                              mContext->GetRegion());
    }
    if (endpointMode == "accelerate") {
        mEndpointMode = EndpointMode::ACCELERATE;
    } else if (endpointMode != "default") {
        PARAM_WARNING_DEFAULT(mContext->GetLogger(),
                              mContext->GetAlarm(),
                              "string param EndpointMode is not valid",
                              "default",
                              sName,
                              mContext->GetConfigName(),
                              mContext->GetProjectName(),
                              mContext->GetLogstoreName(),
                              mContext->GetRegion());
    }
    if (mEndpointMode == EndpointMode::DEFAULT) {
        // for local pipeline whose flusher region is neither specified in local info nor included by config provider,
        // param Endpoint should be used, and the mode is set to default.
        // warning: if inconsistency exists among configs, only the first config would be considered in this situation.
        if (!GetOptionalStringParam(config, "Endpoint", mEndpoint, errorMsg)) {
            PARAM_WARNING_IGNORE(mContext->GetLogger(),
                                 mContext->GetAlarm(),
                                 errorMsg,
                                 sName,
                                 mContext->GetConfigName(),
                                 mContext->GetProjectName(),
                                 mContext->GetLogstoreName(),
                                 mContext->GetRegion());
        }
        EnterpriseSLSClientManager::GetInstance()->UpdateRemoteRegionEndpoints(
            mRegion, {mEndpoint}, EnterpriseSLSClientManager::RemoteEndpointUpdateAction::APPEND);
    }
    mCandidateHostsInfo
        = EnterpriseSLSClientManager::GetInstance()->GetCandidateHostsInfo(mRegion, mProject, mEndpointMode);
    LOG_INFO(mContext->GetLogger(),
             ("get candidate hosts info, region", mRegion)("project", mProject)("logstore", mLogstore)(
                 "endpoint mode", EndpointModeToString(mCandidateHostsInfo->GetMode())));
#else
    // Endpoint
    if (!GetMandatoryStringParam(config, "Endpoint", mEndpoint, errorMsg)) {
        PARAM_ERROR_RETURN(mContext->GetLogger(),
                           mContext->GetAlarm(),
                           errorMsg,
                           sName,
                           mContext->GetConfigName(),
                           mContext->GetProjectName(),
                           mContext->GetLogstoreName(),
                           mContext->GetRegion());
    }
    mEndpoint = TrimString(mEndpoint);
    if (mEndpoint.empty()) {
        PARAM_ERROR_RETURN(mContext->GetLogger(),
                           mContext->GetAlarm(),
                           "param Endpoint is empty",
                           sName,
                           mContext->GetConfigName(),
                           mContext->GetProjectName(),
                           mContext->GetLogstoreName(),
                           mContext->GetRegion());
    }
#endif


    // Batch
    const char* key = "Batch";
    const Json::Value* itr = config.find(key, key + strlen(key));
    if (itr) {
        if (!itr->isObject()) {
            PARAM_WARNING_IGNORE(mContext->GetLogger(),
                                 mContext->GetAlarm(),
                                 "param Batch is not of type object",
                                 sName,
                                 mContext->GetConfigName(),
                                 mContext->GetProjectName(),
                                 mContext->GetLogstoreName(),
                                 mContext->GetRegion());
        }

        // Deprecated, use ShardHashKeys instead
        if (!GetOptionalListParam<string>(*itr, "Batch.ShardHashKeys", mShardHashKeys, errorMsg)) {
            PARAM_WARNING_IGNORE(mContext->GetLogger(),
                                 mContext->GetAlarm(),
                                 errorMsg,
                                 sName,
                                 mContext->GetConfigName(),
                                 mContext->GetProjectName(),
                                 mContext->GetLogstoreName(),
                                 mContext->GetRegion());
        }
    }

    // ShardHashKeys
    if (mTelemetryType == sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS && !mContext->IsExactlyOnceEnabled()) {
        if (!GetOptionalListParam<string>(config, "ShardHashKeys", mShardHashKeys, errorMsg)) {
            PARAM_WARNING_IGNORE(mContext->GetLogger(),
                                 mContext->GetAlarm(),
                                 errorMsg,
                                 sName,
                                 mContext->GetConfigName(),
                                 mContext->GetProjectName(),
                                 mContext->GetLogstoreName(),
                                 mContext->GetRegion());
        }
    }

    DefaultFlushStrategyOptions strategy{
        static_cast<uint32_t>(INT32_FLAG(max_send_log_group_size) / DOUBLE_FLAG(sls_serialize_size_expansion_ratio)),
        static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)),
        static_cast<uint32_t>(INT32_FLAG(merge_log_count_limit)),
        static_cast<uint32_t>(INT32_FLAG(batch_send_interval))};
    if (!mBatcher.Init(itr ? *itr : Json::Value(),
                       this,
                       strategy,
                       !mContext->IsExactlyOnceEnabled() && mShardHashKeys.empty()
                           && mTelemetryType != sls_logs::SLS_TELEMETRY_TYPE_METRICS)) {
        // when either exactly once is enabled or ShardHashKeys is not empty or telemetry type is metrics, we don't
        // enable group batch
        return false;
    }

    // CompressType
    if (BOOL_FLAG(sls_client_send_compress)) {
        mCompressor = CompressorFactory::GetInstance()->Create(config, *mContext, sName, mPluginID, CompressType::LZ4);
    }

    mGroupSerializer = make_unique<SLSEventGroupSerializer>(this);
    mGroupListSerializer = make_unique<SLSEventGroupListSerializer>(this);

    // MaxSendRate
    // For legacy reason, MaxSendRate should be int, where negative number means unlimited. However, this can be
    // compatable with the following logic.
    if (!GetOptionalUIntParam(config, "MaxSendRate", mMaxSendRate, errorMsg)) {
        PARAM_WARNING_DEFAULT(mContext->GetLogger(),
                              mContext->GetAlarm(),
                              errorMsg,
                              mMaxSendRate,
                              sName,
                              mContext->GetConfigName(),
                              mContext->GetProjectName(),
                              mContext->GetLogstoreName(),
                              mContext->GetRegion());
    }

    if (!mContext->IsExactlyOnceEnabled()) {
        GenerateQueueKey(mProject + "#" + mLogstore);
        SenderQueueManager::GetInstance()->CreateQueue(
            mQueueKey,
            mPluginID,
            *mContext,
            {{"region", GetRegionConcurrencyLimiter(mRegion)},
             {"project", GetProjectConcurrencyLimiter(mProject)},
             {"logstore", GetLogstoreConcurrencyLimiter(mProject, mLogstore)}},
            mMaxSendRate);
    }

    GenerateGoPlugin(config, optionalGoPipeline);

    mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL);
    mSendDoneCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL);
    mSuccessCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL);
    mDiscardCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_DISCARD_TOTAL);
    mNetworkErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_NETWORK_ERROR_TOTAL);
    mServerErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SERVER_ERROR_TOTAL);
    mShardWriteQuotaErrorCnt
        = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_SHARD_WRITE_QUOTA_ERROR_TOTAL);
    mProjectQuotaErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_PROJECT_QUOTA_ERROR_TOTAL);
    mUnauthErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_UNAUTH_ERROR_TOTAL);
    mParamsErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_PARAMS_ERROR_TOTAL);
    mSequenceIDErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_SEQUENCE_ID_ERROR_TOTAL);
    mRequestExpiredErrorCnt
        = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_REQUEST_EXPRIRED_ERROR_TOTAL);
    mOtherErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OTHER_ERROR_TOTAL);

    return true;
}