in core/plugin/flusher/sls/FlusherSLS.cpp [285:568]
bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline) {
string errorMsg;
// Project
if (!GetMandatoryStringParam(config, "Project", mProject, errorMsg)) {
PARAM_ERROR_RETURN(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
// TelemetryType
string telemetryType;
if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
"logs",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
} else if (telemetryType == "metrics") {
// TelemetryType set to metrics
mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS
: sls_logs::SLS_TELEMETRY_TYPE_LOGS;
} else if (telemetryType == "arms_agentinfo") {
mSubpath = APM_AGENTINFOS_URL;
mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS;
} else if (telemetryType == "arms_metrics") {
mSubpath = APM_METRICS_URL;
mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS;
} else if (telemetryType == "arms_traces") {
mSubpath = APM_TRACES_URL;
mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES;
} else if (!telemetryType.empty() && telemetryType != "logs") {
// TelemetryType invalid
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
"string param TelemetryType is not valid",
"logs",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
// Logstore
if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) {
// log and metric
if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) {
PARAM_ERROR_RETURN(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
}
// Region
if (
#ifdef __ENTERPRISE__
!EnterpriseConfigProvider::GetInstance()->IsDataServerPrivateCloud() &&
#endif
!GetOptionalStringParam(config, "Region", mRegion, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
mRegion,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
#ifdef __ENTERPRISE__
// Aliuid
if (!GetOptionalStringParam(config, "Aliuid", mAliuid, errorMsg)) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
// EndpointMode
string endpointMode = "default";
if (!GetOptionalStringParam(config, "EndpointMode", endpointMode, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
endpointMode,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
if (endpointMode == "accelerate") {
mEndpointMode = EndpointMode::ACCELERATE;
} else if (endpointMode != "default") {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
"string param EndpointMode is not valid",
"default",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
if (mEndpointMode == EndpointMode::DEFAULT) {
// for local pipeline whose flusher region is neither specified in local info nor included by config provider,
// param Endpoint should be used, and the mode is set to default.
// warning: if inconsistency exists among configs, only the first config would be considered in this situation.
if (!GetOptionalStringParam(config, "Endpoint", mEndpoint, errorMsg)) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
EnterpriseSLSClientManager::GetInstance()->UpdateRemoteRegionEndpoints(
mRegion, {mEndpoint}, EnterpriseSLSClientManager::RemoteEndpointUpdateAction::APPEND);
}
mCandidateHostsInfo
= EnterpriseSLSClientManager::GetInstance()->GetCandidateHostsInfo(mRegion, mProject, mEndpointMode);
LOG_INFO(mContext->GetLogger(),
("get candidate hosts info, region", mRegion)("project", mProject)("logstore", mLogstore)(
"endpoint mode", EndpointModeToString(mCandidateHostsInfo->GetMode())));
#else
// Endpoint
if (!GetMandatoryStringParam(config, "Endpoint", mEndpoint, errorMsg)) {
PARAM_ERROR_RETURN(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
mEndpoint = TrimString(mEndpoint);
if (mEndpoint.empty()) {
PARAM_ERROR_RETURN(mContext->GetLogger(),
mContext->GetAlarm(),
"param Endpoint is empty",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
#endif
// Batch
const char* key = "Batch";
const Json::Value* itr = config.find(key, key + strlen(key));
if (itr) {
if (!itr->isObject()) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
"param Batch is not of type object",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
// Deprecated, use ShardHashKeys instead
if (!GetOptionalListParam<string>(*itr, "Batch.ShardHashKeys", mShardHashKeys, errorMsg)) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
}
// ShardHashKeys
if (mTelemetryType == sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS && !mContext->IsExactlyOnceEnabled()) {
if (!GetOptionalListParam<string>(config, "ShardHashKeys", mShardHashKeys, errorMsg)) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
}
DefaultFlushStrategyOptions strategy{
static_cast<uint32_t>(INT32_FLAG(max_send_log_group_size) / DOUBLE_FLAG(sls_serialize_size_expansion_ratio)),
static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)),
static_cast<uint32_t>(INT32_FLAG(merge_log_count_limit)),
static_cast<uint32_t>(INT32_FLAG(batch_send_interval))};
if (!mBatcher.Init(itr ? *itr : Json::Value(),
this,
strategy,
!mContext->IsExactlyOnceEnabled() && mShardHashKeys.empty()
&& mTelemetryType != sls_logs::SLS_TELEMETRY_TYPE_METRICS)) {
// when either exactly once is enabled or ShardHashKeys is not empty or telemetry type is metrics, we don't
// enable group batch
return false;
}
// CompressType
if (BOOL_FLAG(sls_client_send_compress)) {
mCompressor = CompressorFactory::GetInstance()->Create(config, *mContext, sName, mPluginID, CompressType::LZ4);
}
mGroupSerializer = make_unique<SLSEventGroupSerializer>(this);
mGroupListSerializer = make_unique<SLSEventGroupListSerializer>(this);
// MaxSendRate
// For legacy reason, MaxSendRate should be int, where negative number means unlimited. However, this can be
// compatable with the following logic.
if (!GetOptionalUIntParam(config, "MaxSendRate", mMaxSendRate, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
mMaxSendRate,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
if (!mContext->IsExactlyOnceEnabled()) {
GenerateQueueKey(mProject + "#" + mLogstore);
SenderQueueManager::GetInstance()->CreateQueue(
mQueueKey,
mPluginID,
*mContext,
{{"region", GetRegionConcurrencyLimiter(mRegion)},
{"project", GetProjectConcurrencyLimiter(mProject)},
{"logstore", GetLogstoreConcurrencyLimiter(mProject, mLogstore)}},
mMaxSendRate);
}
GenerateGoPlugin(config, optionalGoPipeline);
mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL);
mSendDoneCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL);
mSuccessCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL);
mDiscardCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_DISCARD_TOTAL);
mNetworkErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_NETWORK_ERROR_TOTAL);
mServerErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SERVER_ERROR_TOTAL);
mShardWriteQuotaErrorCnt
= GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_SHARD_WRITE_QUOTA_ERROR_TOTAL);
mProjectQuotaErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_PROJECT_QUOTA_ERROR_TOTAL);
mUnauthErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_UNAUTH_ERROR_TOTAL);
mParamsErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_PARAMS_ERROR_TOTAL);
mSequenceIDErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_SEQUENCE_ID_ERROR_TOTAL);
mRequestExpiredErrorCnt
= GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_REQUEST_EXPRIRED_ERROR_TOTAL);
mOtherErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OTHER_ERROR_TOTAL);
return true;
}