core/plugin/flusher/sls/FlusherSLS.cpp (1,167 lines of code) (raw):

// Copyright 2023 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "plugin/flusher/sls/FlusherSLS.h" #include "app_config/AppConfig.h" #include "collection_pipeline/CollectionPipeline.h" #include "collection_pipeline/batch/FlushStrategy.h" #include "collection_pipeline/queue/QueueKeyManager.h" #include "collection_pipeline/queue/SLSSenderQueueItem.h" #include "collection_pipeline/queue/SenderQueueManager.h" #include "common/EndpointUtil.h" #include "common/Flags.h" #include "common/HashUtil.h" #include "common/LogtailCommonFlags.h" #include "common/ParamExtractor.h" #include "common/TimeUtil.h" #include "common/compression/CompressorFactory.h" #include "common/http/Constant.h" #include "common/http/HttpRequest.h" #include "plugin/flusher/sls/DiskBufferWriter.h" #include "plugin/flusher/sls/PackIdManager.h" #include "plugin/flusher/sls/SLSClientManager.h" #include "plugin/flusher/sls/SLSConstant.h" #include "plugin/flusher/sls/SLSResponse.h" #include "plugin/flusher/sls/SLSUtil.h" #include "plugin/flusher/sls/SendResult.h" #include "provider/Provider.h" #include "runner/FlusherRunner.h" #include "sls_logs.pb.h" #ifdef __ENTERPRISE__ #include "config/provider/EnterpriseConfigProvider.h" #endif using namespace std; DEFINE_FLAG_INT32(batch_send_interval, "batch sender interval (second)(default 3)", 3); DEFINE_FLAG_INT32(merge_log_count_limit, "log count in one logGroup at most", 4000); DEFINE_FLAG_INT32(batch_send_metric_size, "batch send metric size limit(bytes)(default 512KB)", 512 * 1024); DEFINE_FLAG_INT32(send_check_real_ip_interval, "seconds", 2); DEFINE_FLAG_INT32(unauthorized_send_retrytimes, "how many times should retry if PostLogStoreLogs operation return UnAuthorized", 5); DEFINE_FLAG_INT32(unauthorized_allowed_delay_after_reset, "allowed delay to retry for unauthorized error, 30s", 30); DEFINE_FLAG_INT32(discard_send_fail_interval, "discard data when send fail after 6 * 3600 seconds", 6 * 3600); DEFINE_FLAG_INT32(profile_data_send_retrytimes, "how many times should retry if profile data send fail", 5); DEFINE_FLAG_INT32(unknow_error_try_max, "discard data when try times > this value", 5); DEFINE_FLAG_BOOL(enable_metricstore_channel, "only works for metrics data for enhance metrics query performance", true); DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024); DEFINE_FLAG_DOUBLE(sls_serialize_size_expansion_ratio, "", 1.2); DEFINE_FLAG_INT32(sls_request_dscp, "set dscp for sls request, from 0 to 63", -1); DECLARE_FLAG_BOOL(send_prefer_real_ip); namespace logtail { enum class OperationOnFail { RETRY_IMMEDIATELY, RETRY_LATER, DISCARD }; static const int ON_FAIL_LOG_WARNING_INTERVAL_SECOND = 10; static constexpr int64_t kInvalidHashKeySeqID = 0; static const char* GetOperationString(OperationOnFail op) { switch (op) { case OperationOnFail::RETRY_IMMEDIATELY: return "retry now"; case OperationOnFail::RETRY_LATER: return "retry later"; case OperationOnFail::DISCARD: default: return "discard data"; } } static OperationOnFail DefaultOperation(uint32_t retryTimes) { if (retryTimes > static_cast<uint32_t>(INT32_FLAG(unknow_error_try_max))) { return OperationOnFail::DISCARD; } else { return OperationOnFail::RETRY_LATER; } } void FlusherSLS::InitResource() { #ifndef APSARA_UNIT_TEST_MAIN if (!sIsResourceInited) { SLSClientManager::GetInstance()->Init(); DiskBufferWriter::GetInstance()->Init(); sIsResourceInited = true; } #endif } void FlusherSLS::RecycleResourceIfNotUsed() { #ifndef APSARA_UNIT_TEST_MAIN if (sIsResourceInited) { SLSClientManager::GetInstance()->Stop(); DiskBufferWriter::GetInstance()->Stop(); } #endif } mutex FlusherSLS::sMux; unordered_map<string, weak_ptr<ConcurrencyLimiter>> FlusherSLS::sProjectConcurrencyLimiterMap; unordered_map<string, weak_ptr<ConcurrencyLimiter>> FlusherSLS::sRegionConcurrencyLimiterMap; unordered_map<string, weak_ptr<ConcurrencyLimiter>> FlusherSLS::sLogstoreConcurrencyLimiterMap; shared_ptr<ConcurrencyLimiter> GetConcurrencyLimiter(const std::string& description) { return make_shared<ConcurrencyLimiter>(description, AppConfig::GetInstance()->GetSendRequestConcurrency()); } shared_ptr<ConcurrencyLimiter> FlusherSLS::GetLogstoreConcurrencyLimiter(const std::string& project, const std::string& logstore) { lock_guard<mutex> lock(sMux); std::string key = project + "-" + logstore; auto iter = sLogstoreConcurrencyLimiterMap.find(key); if (iter == sLogstoreConcurrencyLimiterMap.end()) { auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency()); sLogstoreConcurrencyLimiterMap.try_emplace(key, limiter); return limiter; } auto limiter = iter->second.lock(); if (!limiter) { limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency()); iter->second = limiter; } return limiter; } shared_ptr<ConcurrencyLimiter> FlusherSLS::GetProjectConcurrencyLimiter(const string& project) { lock_guard<mutex> lock(sMux); auto iter = sProjectConcurrencyLimiterMap.find(project); if (iter == sProjectConcurrencyLimiterMap.end()) { auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency()); sProjectConcurrencyLimiterMap.try_emplace(project, limiter); return limiter; } auto limiter = iter->second.lock(); if (!limiter) { limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency()); iter->second = limiter; } return limiter; } shared_ptr<ConcurrencyLimiter> FlusherSLS::GetRegionConcurrencyLimiter(const string& region) { lock_guard<mutex> lock(sMux); auto iter = sRegionConcurrencyLimiterMap.find(region); if (iter == sRegionConcurrencyLimiterMap.end()) { auto limiter = make_shared<ConcurrencyLimiter>( sName + "#network#region#" + region, AppConfig::GetInstance()->GetSendRequestConcurrency(), AppConfig::GetInstance()->GetSendRequestConcurrency() * AppConfig::GetInstance()->GetGlobalConcurrencyFreePercentageForOneRegion()); sRegionConcurrencyLimiterMap.try_emplace(region, limiter); return limiter; } auto limiter = iter->second.lock(); if (!limiter) { limiter = make_shared<ConcurrencyLimiter>( sName + "#network#region#" + region, AppConfig::GetInstance()->GetSendRequestConcurrency(), AppConfig::GetInstance()->GetSendRequestConcurrency() * AppConfig::GetInstance()->GetGlobalConcurrencyFreePercentageForOneRegion()); iter->second = limiter; } return limiter; } void FlusherSLS::ClearInvalidConcurrencyLimiters() { lock_guard<mutex> lock(sMux); for (auto iter = sProjectConcurrencyLimiterMap.begin(); iter != sProjectConcurrencyLimiterMap.end();) { if (iter->second.expired()) { iter = sProjectConcurrencyLimiterMap.erase(iter); } else { ++iter; } } for (auto iter = sRegionConcurrencyLimiterMap.begin(); iter != sRegionConcurrencyLimiterMap.end();) { if (iter->second.expired()) { iter = sRegionConcurrencyLimiterMap.erase(iter); } else { ++iter; } } for (auto iter = sLogstoreConcurrencyLimiterMap.begin(); iter != sLogstoreConcurrencyLimiterMap.end();) { if (iter->second.expired()) { iter = sLogstoreConcurrencyLimiterMap.erase(iter); } else { ++iter; } } } mutex FlusherSLS::sDefaultRegionLock; string FlusherSLS::sDefaultRegion; string FlusherSLS::GetDefaultRegion() { lock_guard<mutex> lock(sDefaultRegionLock); if (sDefaultRegion.empty()) { sDefaultRegion = STRING_FLAG(default_region_name); } return sDefaultRegion; } void FlusherSLS::SetDefaultRegion(const string& region) { lock_guard<mutex> lock(sDefaultRegionLock); sDefaultRegion = region; } mutex FlusherSLS::sProjectRegionMapLock; unordered_map<string, int32_t> FlusherSLS::sProjectRefCntMap; unordered_map<string, string> FlusherSLS::sProjectRegionMap; string FlusherSLS::GetAllProjects() { string result; lock_guard<mutex> lock(sProjectRegionMapLock); for (auto iter = sProjectRefCntMap.cbegin(); iter != sProjectRefCntMap.cend(); ++iter) { result.append(iter->first).append(" "); } return result; } std::string FlusherSLS::GetProjectRegion(const std::string& project) { lock_guard<mutex> lock(sProjectRegionMapLock); auto iter = sProjectRegionMap.find(project); if (iter == sProjectRegionMap.end()) { return ""; } return iter->second; } void FlusherSLS::IncreaseProjectRegionReferenceCnt(const string& project, const string& region) { lock_guard<mutex> lock(sProjectRegionMapLock); ++sProjectRefCntMap[project]; sProjectRegionMap[project] = region; } void FlusherSLS::DecreaseProjectRegionReferenceCnt(const string& project, const string& region) { lock_guard<mutex> lock(sProjectRegionMapLock); auto projectRefCnt = sProjectRefCntMap.find(project); if (projectRefCnt != sProjectRefCntMap.end()) { if (--projectRefCnt->second == 0) { sProjectRefCntMap.erase(projectRefCnt); sProjectRegionMap.erase(project); } } } bool FlusherSLS::sIsResourceInited = false; const string FlusherSLS::sName = "flusher_sls"; const unordered_set<string> FlusherSLS::sNativeParam = {"Project", "Logstore", "Region", "Endpoint", "EndpointMode", "Aliuid", "CompressType", "TelemetryType", "MaxSendRate", "ShardHashKeys", "Batch"}; FlusherSLS::FlusherSLS() : mRegion(GetDefaultRegion()) { } bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline) { string errorMsg; // Project if (!GetMandatoryStringParam(config, "Project", mProject, errorMsg)) { PARAM_ERROR_RETURN(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } // TelemetryType string telemetryType; if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) { PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, "logs", sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } else if (telemetryType == "metrics") { // TelemetryType set to metrics mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS : sls_logs::SLS_TELEMETRY_TYPE_LOGS; } else if (telemetryType == "arms_agentinfo") { mSubpath = APM_AGENTINFOS_URL; mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS; } else if (telemetryType == "arms_metrics") { mSubpath = APM_METRICS_URL; mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS; } else if (telemetryType == "arms_traces") { mSubpath = APM_TRACES_URL; mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES; } else if (!telemetryType.empty() && telemetryType != "logs") { // TelemetryType invalid PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), "string param TelemetryType is not valid", "logs", sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } // Logstore if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { // log and metric if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) { PARAM_ERROR_RETURN(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } } // Region if ( #ifdef __ENTERPRISE__ !EnterpriseConfigProvider::GetInstance()->IsDataServerPrivateCloud() && #endif !GetOptionalStringParam(config, "Region", mRegion, errorMsg)) { PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, mRegion, sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } #ifdef __ENTERPRISE__ // Aliuid if (!GetOptionalStringParam(config, "Aliuid", mAliuid, errorMsg)) { PARAM_WARNING_IGNORE(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } // EndpointMode string endpointMode = "default"; if (!GetOptionalStringParam(config, "EndpointMode", endpointMode, errorMsg)) { PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, endpointMode, sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } if (endpointMode == "accelerate") { mEndpointMode = EndpointMode::ACCELERATE; } else if (endpointMode != "default") { PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), "string param EndpointMode is not valid", "default", sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } if (mEndpointMode == EndpointMode::DEFAULT) { // for local pipeline whose flusher region is neither specified in local info nor included by config provider, // param Endpoint should be used, and the mode is set to default. // warning: if inconsistency exists among configs, only the first config would be considered in this situation. if (!GetOptionalStringParam(config, "Endpoint", mEndpoint, errorMsg)) { PARAM_WARNING_IGNORE(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } EnterpriseSLSClientManager::GetInstance()->UpdateRemoteRegionEndpoints( mRegion, {mEndpoint}, EnterpriseSLSClientManager::RemoteEndpointUpdateAction::APPEND); } mCandidateHostsInfo = EnterpriseSLSClientManager::GetInstance()->GetCandidateHostsInfo(mRegion, mProject, mEndpointMode); LOG_INFO(mContext->GetLogger(), ("get candidate hosts info, region", mRegion)("project", mProject)("logstore", mLogstore)( "endpoint mode", EndpointModeToString(mCandidateHostsInfo->GetMode()))); #else // Endpoint if (!GetMandatoryStringParam(config, "Endpoint", mEndpoint, errorMsg)) { PARAM_ERROR_RETURN(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } mEndpoint = TrimString(mEndpoint); if (mEndpoint.empty()) { PARAM_ERROR_RETURN(mContext->GetLogger(), mContext->GetAlarm(), "param Endpoint is empty", sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } #endif // Batch const char* key = "Batch"; const Json::Value* itr = config.find(key, key + strlen(key)); if (itr) { if (!itr->isObject()) { PARAM_WARNING_IGNORE(mContext->GetLogger(), mContext->GetAlarm(), "param Batch is not of type object", sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } // Deprecated, use ShardHashKeys instead if (!GetOptionalListParam<string>(*itr, "Batch.ShardHashKeys", mShardHashKeys, errorMsg)) { PARAM_WARNING_IGNORE(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } } // ShardHashKeys if (mTelemetryType == sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS && !mContext->IsExactlyOnceEnabled()) { if (!GetOptionalListParam<string>(config, "ShardHashKeys", mShardHashKeys, errorMsg)) { PARAM_WARNING_IGNORE(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } } DefaultFlushStrategyOptions strategy{ static_cast<uint32_t>(INT32_FLAG(max_send_log_group_size) / DOUBLE_FLAG(sls_serialize_size_expansion_ratio)), static_cast<uint32_t>(INT32_FLAG(batch_send_metric_size)), static_cast<uint32_t>(INT32_FLAG(merge_log_count_limit)), static_cast<uint32_t>(INT32_FLAG(batch_send_interval))}; if (!mBatcher.Init(itr ? *itr : Json::Value(), this, strategy, !mContext->IsExactlyOnceEnabled() && mShardHashKeys.empty() && mTelemetryType != sls_logs::SLS_TELEMETRY_TYPE_METRICS)) { // when either exactly once is enabled or ShardHashKeys is not empty or telemetry type is metrics, we don't // enable group batch return false; } // CompressType if (BOOL_FLAG(sls_client_send_compress)) { mCompressor = CompressorFactory::GetInstance()->Create(config, *mContext, sName, mPluginID, CompressType::LZ4); } mGroupSerializer = make_unique<SLSEventGroupSerializer>(this); mGroupListSerializer = make_unique<SLSEventGroupListSerializer>(this); // MaxSendRate // For legacy reason, MaxSendRate should be int, where negative number means unlimited. However, this can be // compatable with the following logic. if (!GetOptionalUIntParam(config, "MaxSendRate", mMaxSendRate, errorMsg)) { PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), errorMsg, mMaxSendRate, sName, mContext->GetConfigName(), mContext->GetProjectName(), mContext->GetLogstoreName(), mContext->GetRegion()); } if (!mContext->IsExactlyOnceEnabled()) { GenerateQueueKey(mProject + "#" + mLogstore); SenderQueueManager::GetInstance()->CreateQueue( mQueueKey, mPluginID, *mContext, {{"region", GetRegionConcurrencyLimiter(mRegion)}, {"project", GetProjectConcurrencyLimiter(mProject)}, {"logstore", GetLogstoreConcurrencyLimiter(mProject, mLogstore)}}, mMaxSendRate); } GenerateGoPlugin(config, optionalGoPipeline); mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL); mSendDoneCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL); mSuccessCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL); mDiscardCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_DISCARD_TOTAL); mNetworkErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_NETWORK_ERROR_TOTAL); mServerErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SERVER_ERROR_TOTAL); mShardWriteQuotaErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_SHARD_WRITE_QUOTA_ERROR_TOTAL); mProjectQuotaErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_PROJECT_QUOTA_ERROR_TOTAL); mUnauthErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_UNAUTH_ERROR_TOTAL); mParamsErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_PARAMS_ERROR_TOTAL); mSequenceIDErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_SEQUENCE_ID_ERROR_TOTAL); mRequestExpiredErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_REQUEST_EXPRIRED_ERROR_TOTAL); mOtherErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OTHER_ERROR_TOTAL); return true; } bool FlusherSLS::Start() { Flusher::Start(); IncreaseProjectRegionReferenceCnt(mProject, mRegion); return true; } bool FlusherSLS::Stop(bool isPipelineRemoving) { Flusher::Stop(isPipelineRemoving); DecreaseProjectRegionReferenceCnt(mProject, mRegion); return true; } bool FlusherSLS::Send(PipelineEventGroup&& g) { if (g.IsReplay()) { return SerializeAndPush(std::move(g)); } else { vector<BatchedEventsList> res; mBatcher.Add(std::move(g), res); return SerializeAndPush(std::move(res)); } } bool FlusherSLS::Flush(size_t key) { BatchedEventsList res; mBatcher.FlushQueue(key, res); return SerializeAndPush(std::move(res)); } bool FlusherSLS::FlushAll() { vector<BatchedEventsList> res; mBatcher.FlushAll(res); return SerializeAndPush(std::move(res)); } bool FlusherSLS::BuildRequest(SenderQueueItem* item, unique_ptr<HttpSinkRequest>& req, bool* keepItem, string* errMsg) { ADD_COUNTER(mSendCnt, 1); SLSClientManager::AuthType type; string accessKeyId, accessKeySecret; if (!SLSClientManager::GetInstance()->GetAccessKey(mAliuid, type, accessKeyId, accessKeySecret)) { #ifdef __ENTERPRISE__ if (!EnterpriseSLSClientManager::GetInstance()->GetAccessKeyIfProjectSupportsAnonymousWrite( mProject, type, accessKeyId, accessKeySecret)) { *keepItem = true; *errMsg = "failed to get access key"; return false; } #endif } auto data = static_cast<SLSSenderQueueItem*>(item); #ifdef __ENTERPRISE__ if (BOOL_FLAG(send_prefer_real_ip)) { data->mCurrentHost = EnterpriseSLSClientManager::GetInstance()->GetRealIp(mRegion); if (data->mCurrentHost.empty()) { auto info = EnterpriseSLSClientManager::GetInstance()->GetCandidateHostsInfo(mRegion, mProject, mEndpointMode); if (mCandidateHostsInfo.get() != info.get()) { LOG_INFO(sLogger, ("update candidate hosts info, region", mRegion)("project", mProject)("logstore", mLogstore)( "from", EndpointModeToString(mCandidateHostsInfo->GetMode()))( "to", EndpointModeToString(info->GetMode()))); mCandidateHostsInfo = info; } data->mCurrentHost = mCandidateHostsInfo->GetCurrentHost(); data->mRealIpFlag = false; } else { data->mRealIpFlag = true; } } else { // in case local region endpoint mode is changed, we should always check before sending auto info = EnterpriseSLSClientManager::GetInstance()->GetCandidateHostsInfo(mRegion, mProject, mEndpointMode); if (mCandidateHostsInfo == nullptr) { // TODO: temporarily used here, for send logtail alarm only, should be removed after alarm is refactored mCandidateHostsInfo = info; } if (mCandidateHostsInfo.get() != info.get()) { LOG_INFO(sLogger, ("update candidate hosts info, region", mRegion)("project", mProject)("logstore", mLogstore)( "from", EndpointModeToString(mCandidateHostsInfo->GetMode()))( "to", EndpointModeToString(info->GetMode()))); mCandidateHostsInfo = info; } data->mCurrentHost = mCandidateHostsInfo->GetCurrentHost(); } if (data->mCurrentHost.empty()) { if (mCandidateHostsInfo->IsInitialized()) { GetRegionConcurrencyLimiter(mRegion)->OnFail(chrono::system_clock::now()); } *errMsg = "failed to get available host"; *keepItem = true; return false; } #else static string host = mProject + "." + mEndpoint; data->mCurrentHost = host; #endif switch (mTelemetryType) { case sls_logs::SLS_TELEMETRY_TYPE_LOGS: req = CreatePostLogStoreLogsRequest(accessKeyId, accessKeySecret, type, data); break; case sls_logs::SLS_TELEMETRY_TYPE_METRICS: req = CreatePostMetricStoreLogsRequest(accessKeyId, accessKeySecret, type, data); break; case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS: case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS: case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES: req = CreatePostAPMBackendRequest(accessKeyId, accessKeySecret, type, data, mSubpath); break; default: break; } return true; } void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) { ADD_COUNTER(mSendDoneCnt, 1); SLSResponse slsResponse = ParseHttpResponse(response); auto data = static_cast<SLSSenderQueueItem*>(item); string configName = HasContext() ? GetContext().GetConfigName() : ""; string hostname = data->mCurrentHost; bool isProfileData = GetProfileSender()->IsProfileData(mRegion, mProject, data->mLogstore); int32_t curTime = time(NULL); auto curSystemTime = chrono::system_clock::now(); SendResult sendResult = SEND_OK; if (slsResponse.mStatusCode == 200) { auto& cpt = data->mExactlyOnceCheckpoint; if (cpt) { cpt->Commit(); cpt->IncreaseSequenceID(); } LOG_DEBUG( sLogger, ("send data to sls succeeded, item address", item)("request id", slsResponse.mRequestId)( "config", configName)("region", mRegion)("project", mProject)("logstore", data->mLogstore)( "response time", ToString(chrono::duration_cast<chrono::milliseconds>(curSystemTime - item->mLastSendTime).count()) + "ms")( "total send time", ToString(chrono::duration_cast<chrono::milliseconds>(curSystemTime - item->mFirstEnqueTime).count()) + "ms")("try cnt", data->mTryCnt)("endpoint", data->mCurrentHost)("is profile data", isProfileData)); GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime); GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime); GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime); SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey); ADD_COUNTER(mSuccessCnt, 1); DealSenderQueueItemAfterSend(item, false); } else { OperationOnFail operation; sendResult = ConvertErrorCode(slsResponse.mErrorCode); ostringstream failDetail, suggestion; if (sendResult == SEND_NETWORK_ERROR || sendResult == SEND_SERVER_ERROR) { if (sendResult == SEND_NETWORK_ERROR) { failDetail << "network error"; ADD_COUNTER(mNetworkErrorCnt, 1); } else { failDetail << "server error"; ADD_COUNTER(mServerErrorCnt, 1); } suggestion << "check network connection to endpoint"; #ifdef __ENTERPRISE__ if (data->mRealIpFlag) { // connect refused, use vip directly failDetail << ", real ip may be stale, force update"; EnterpriseSLSClientManager::GetInstance()->UpdateOutdatedRealIpRegions(mRegion); } #endif operation = data->mBufferOrNot ? OperationOnFail::RETRY_LATER : OperationOnFail::DISCARD; GetRegionConcurrencyLimiter(mRegion)->OnFail(curSystemTime); GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime); GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime); } else if (sendResult == SEND_QUOTA_EXCEED) { if (slsResponse.mErrorCode == LOGE_SHARD_WRITE_QUOTA_EXCEED) { failDetail << "shard write quota exceed"; suggestion << "Split logstore shards. https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources"; GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnFail(curSystemTime); GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime); GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime); ADD_COUNTER(mShardWriteQuotaErrorCnt, 1); } else { failDetail << "project write quota exceed"; suggestion << "Submit quota modification request. " "https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources"; GetProjectConcurrencyLimiter(mProject)->OnFail(curSystemTime); GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime); GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime); ADD_COUNTER(mProjectQuotaErrorCnt, 1); } AlarmManager::GetInstance()->SendAlarm(SEND_QUOTA_EXCEED_ALARM, "error_code: " + slsResponse.mErrorCode + ", error_message: " + slsResponse.mErrorMsg + ", request_id:" + slsResponse.mRequestId, mRegion, mProject, mContext ? mContext->GetConfigName() : "", data->mLogstore); operation = OperationOnFail::RETRY_LATER; } else if (sendResult == SEND_UNAUTHORIZED) { failDetail << "write unauthorized"; suggestion << "check access keys provided"; operation = OperationOnFail::RETRY_LATER; ADD_COUNTER(mUnauthErrorCnt, 1); } else if (sendResult == SEND_PARAMETER_INVALID) { failDetail << "invalid parameters"; suggestion << "check input parameters"; operation = DefaultOperation(item->mTryCnt); ADD_COUNTER(mParamsErrorCnt, 1); } else if (sendResult == SEND_INVALID_SEQUENCE_ID) { failDetail << "invalid exactly-once sequence id"; ADD_COUNTER(mSequenceIDErrorCnt, 1); do { auto& cpt = data->mExactlyOnceCheckpoint; if (!cpt) { failDetail << ", unexpected result when exactly once checkpoint is not found"; suggestion << "report bug"; AlarmManager::GetInstance()->SendAlarm( EXACTLY_ONCE_ALARM, "drop exactly once log group because of invalid sequence ID, request id:" + slsResponse.mRequestId, mRegion, mProject, mContext ? mContext->GetConfigName() : "", data->mLogstore); operation = OperationOnFail::DISCARD; break; } // Because hash key is generated by UUID library, we consider that // the possibility of hash key conflict is very low, so data is // dropped here. cpt->Commit(); failDetail << ", drop exactly once log group and commit checkpoint" << " checkpointKey:" << cpt->key << " checkpoint:" << cpt->data.DebugString(); suggestion << "no suggestion"; AlarmManager::GetInstance()->SendAlarm( EXACTLY_ONCE_ALARM, "drop exactly once log group because of invalid sequence ID, cpt:" + cpt->key + ", data:" + cpt->data.DebugString() + "request id:" + slsResponse.mRequestId, mRegion, mProject, mContext ? mContext->GetConfigName() : "", data->mLogstore); operation = OperationOnFail::DISCARD; cpt->IncreaseSequenceID(); } while (0); } else if (AppConfig::GetInstance()->EnableLogTimeAutoAdjust() && LOGE_REQUEST_TIME_EXPIRED == slsResponse.mErrorCode) { failDetail << "write request expired, will retry"; suggestion << "check local system time"; operation = OperationOnFail::RETRY_IMMEDIATELY; ADD_COUNTER(mRequestExpiredErrorCnt, 1); } else { failDetail << "other error"; suggestion << "no suggestion"; // when unknown error such as SignatureNotMatch happens, we should retry several times // first time, we will retry immediately // then we record error and retry latter // when retry times > unknow_error_try_max, we will drop this data operation = DefaultOperation(item->mTryCnt); ADD_COUNTER(mOtherErrorCnt, 1); } if (chrono::duration_cast<chrono::seconds>(curSystemTime - item->mFirstEnqueTime).count() > INT32_FLAG(discard_send_fail_interval)) { operation = OperationOnFail::DISCARD; } if (isProfileData && data->mTryCnt >= static_cast<uint32_t>(INT32_FLAG(profile_data_send_retrytimes))) { operation = OperationOnFail::DISCARD; } #define LOG_PATTERN \ ("failed to send request", failDetail.str())("operation", GetOperationString(operation))("suggestion", \ suggestion.str())( \ "item address", item)("request id", slsResponse.mRequestId)("status code", slsResponse.mStatusCode)( \ "error code", slsResponse.mErrorCode)("errMsg", slsResponse.mErrorMsg)("config", configName)( \ "region", mRegion)("project", mProject)("logstore", data->mLogstore)("try cnt", data->mTryCnt)( \ "response time", \ ToString(chrono::duration_cast<chrono::seconds>(curSystemTime - data->mLastSendTime).count()) \ + "ms")("total send time", \ ToString(chrono::duration_cast<chrono::seconds>(curSystemTime - data->mFirstEnqueTime).count()) \ + "ms")("endpoint", data->mCurrentHost)("is profile data", isProfileData) switch (operation) { case OperationOnFail::RETRY_IMMEDIATELY: ++item->mTryCnt; FlusherRunner::GetInstance()->PushToHttpSink(item, false); break; case OperationOnFail::RETRY_LATER: if (slsResponse.mErrorCode == LOGE_REQUEST_TIMEOUT || curTime - data->mLastLogWarningTime > ON_FAIL_LOG_WARNING_INTERVAL_SECOND) { LOG_WARNING(sLogger, LOG_PATTERN); data->mLastLogWarningTime = curTime; } SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey); DealSenderQueueItemAfterSend(item, true); break; case OperationOnFail::DISCARD: ADD_COUNTER(mDiscardCnt, 1); default: LOG_WARNING(sLogger, LOG_PATTERN); if (!isProfileData) { AlarmManager::GetInstance()->SendAlarm( SEND_DATA_FAIL_ALARM, "failed to send request: " + failDetail.str() + "\toperation: " + GetOperationString(operation) + "\trequestId: " + slsResponse.mRequestId + "\tstatusCode: " + ToString(slsResponse.mStatusCode) + "\terrorCode: " + slsResponse.mErrorCode + "\terrorMessage: " + slsResponse.mErrorMsg + "\tconfig: " + configName + "\tendpoint: " + data->mCurrentHost, mRegion, mProject, mContext ? mContext->GetConfigName() : "", data->mLogstore); } SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey); DealSenderQueueItemAfterSend(item, false); break; } } #ifdef __ENTERPRISE__ bool hasNetworkError = sendResult == SEND_NETWORK_ERROR; EnterpriseSLSClientManager::GetInstance()->UpdateHostStatus( mProject, mCandidateHostsInfo->GetMode(), hostname, !hasNetworkError); mCandidateHostsInfo->SelectBestHost(); if (!hasNetworkError) { bool hasAuthError = sendResult == SEND_UNAUTHORIZED; EnterpriseSLSClientManager::GetInstance()->UpdateAccessKeyStatus(mAliuid, !hasAuthError); EnterpriseSLSClientManager::GetInstance()->UpdateProjectAnonymousWriteStatus(mProject, !hasAuthError); } #endif } bool FlusherSLS::Send(string&& data, const string& shardHashKey, const string& logstore) { string compressedData; if (mCompressor) { string errorMsg; if (!mCompressor->DoCompress(data, compressedData, errorMsg)) { LOG_WARNING(mContext->GetLogger(), ("failed to compress data", errorMsg)("action", "discard data")("plugin", sName)("config", mContext->GetConfigName())); mContext->GetAlarm().SendAlarm(COMPRESS_FAIL_ALARM, "failed to compress data: " + errorMsg + "\taction: discard data\tplugin: " + sName + "\tconfig: " + mContext->GetConfigName(), mContext->GetRegion(), mContext->GetProjectName(), mContext->GetConfigName(), mContext->GetLogstoreName()); return false; } } else { compressedData = data; } QueueKey key = mQueueKey; if (!HasContext()) { key = QueueKeyManager::GetInstance()->GetKey(mProject + "-" + mLogstore); if (SenderQueueManager::GetInstance()->GetQueue(key) == nullptr) { CollectionPipelineContext ctx; SenderQueueManager::GetInstance()->CreateQueue( key, "", ctx, std::unordered_map<std::string, std::shared_ptr<ConcurrencyLimiter>>()); } } return Flusher::PushToQueue(make_unique<SLSSenderQueueItem>(std::move(compressedData), data.size(), this, key, logstore.empty() ? mLogstore : logstore, RawDataType::EVENT_GROUP, shardHashKey)); } void FlusherSLS::GenerateGoPlugin(const Json::Value& config, Json::Value& res) const { Json::Value detail(Json::objectValue); for (auto itr = config.begin(); itr != config.end(); ++itr) { if (sNativeParam.find(itr.name()) == sNativeParam.end() && itr.name() != "Type") { detail[itr.name()] = *itr; } } if (mContext->IsFlushingThroughGoPipeline()) { Json::Value plugin(Json::objectValue); plugin["type"] = CollectionPipeline::GenPluginTypeWithID("flusher_sls", mContext->GetPipeline().GetNowPluginID()); plugin["detail"] = detail; res["flushers"].append(plugin); } } bool FlusherSLS::SerializeAndPush(PipelineEventGroup&& group) { string serializedData, compressedData; BatchedEvents g(std::move(group.MutableEvents()), std::move(group.GetSizedTags()), std::move(group.GetSourceBuffer()), group.GetMetadata(EventGroupMetaKey::SOURCE_ID), std::move(group.GetExactlyOnceCheckpoint())); AddPackId(g); string errorMsg; if (!mGroupSerializer->DoSerialize(std::move(g), serializedData, errorMsg)) { LOG_WARNING(mContext->GetLogger(), ("failed to serialize event group", errorMsg)("action", "discard data")("plugin", sName)("config", mContext->GetConfigName())); mContext->GetAlarm().SendAlarm(SERIALIZE_FAIL_ALARM, "failed to serialize event group: " + errorMsg + "\taction: discard data\tplugin: " + sName + "\tconfig: " + mContext->GetConfigName(), mContext->GetRegion(), mContext->GetProjectName(), mContext->GetConfigName(), mContext->GetLogstoreName()); return false; } if (mCompressor) { if (!mCompressor->DoCompress(serializedData, compressedData, errorMsg)) { LOG_WARNING(mContext->GetLogger(), ("failed to compress event group", errorMsg)("action", "discard data")("plugin", sName)("config", mContext->GetConfigName())); mContext->GetAlarm().SendAlarm(COMPRESS_FAIL_ALARM, "failed to compress event group: " + errorMsg + "\taction: discard data\tplugin: " + sName + "\tconfig: " + mContext->GetConfigName(), mContext->GetRegion(), mContext->GetProjectName(), mContext->GetConfigName(), mContext->GetLogstoreName()); return false; } } else { compressedData = serializedData; } // must create a tmp, because eoo checkpoint is moved in second param auto fbKey = g.mExactlyOnceCheckpoint->fbKey; return PushToQueue(fbKey, make_unique<SLSSenderQueueItem>(std::move(compressedData), serializedData.size(), this, fbKey, mLogstore, RawDataType::EVENT_GROUP, g.mExactlyOnceCheckpoint->data.hash_key(), std::move(g.mExactlyOnceCheckpoint), false)); } bool FlusherSLS::SerializeAndPush(BatchedEventsList&& groupList) { if (groupList.empty()) { return true; } vector<CompressedLogGroup> compressedLogGroups; string shardHashKey, serializedData, compressedData; size_t packageSize = 0; bool enablePackageList = groupList.size() > 1; bool allSucceeded = true; for (auto& group : groupList) { if (!mShardHashKeys.empty()) { shardHashKey = GetShardHashKey(group); } AddPackId(group); string errorMsg; if (!mGroupSerializer->DoSerialize(std::move(group), serializedData, errorMsg)) { LOG_WARNING(mContext->GetLogger(), ("failed to serialize event group", errorMsg)("action", "discard data")("plugin", sName)("config", mContext->GetConfigName())); mContext->GetAlarm().SendAlarm(SERIALIZE_FAIL_ALARM, "failed to serialize event group: " + errorMsg + "\taction: discard data\tplugin: " + sName + "\tconfig: " + mContext->GetConfigName(), mContext->GetRegion(), mContext->GetProjectName(), mContext->GetConfigName(), mContext->GetLogstoreName()); allSucceeded = false; continue; } if (mCompressor) { if (!mCompressor->DoCompress(serializedData, compressedData, errorMsg)) { LOG_WARNING(mContext->GetLogger(), ("failed to compress event group", errorMsg)("action", "discard data")("plugin", sName)("config", mContext->GetConfigName())); mContext->GetAlarm().SendAlarm(COMPRESS_FAIL_ALARM, "failed to compress event group: " + errorMsg + "\taction: discard data\tplugin: " + sName + "\tconfig: " + mContext->GetConfigName(), mContext->GetRegion(), mContext->GetProjectName(), mContext->GetConfigName(), mContext->GetLogstoreName()); allSucceeded = false; continue; } } else { compressedData = serializedData; } if (enablePackageList) { packageSize += serializedData.size(); compressedLogGroups.emplace_back(std::move(compressedData), serializedData.size()); } else { if (group.mExactlyOnceCheckpoint) { // must create a tmp, because eoo checkpoint is moved in second param auto fbKey = group.mExactlyOnceCheckpoint->fbKey; allSucceeded = PushToQueue(fbKey, make_unique<SLSSenderQueueItem>(std::move(compressedData), serializedData.size(), this, fbKey, mLogstore, RawDataType::EVENT_GROUP, group.mExactlyOnceCheckpoint->data.hash_key(), std::move(group.mExactlyOnceCheckpoint), false)) && allSucceeded; } else { allSucceeded = Flusher::PushToQueue(make_unique<SLSSenderQueueItem>(std::move(compressedData), serializedData.size(), this, mQueueKey, mLogstore, RawDataType::EVENT_GROUP, shardHashKey)) && allSucceeded; } } } if (enablePackageList) { string errorMsg; mGroupListSerializer->DoSerialize(std::move(compressedLogGroups), serializedData, errorMsg); allSucceeded = Flusher::PushToQueue(make_unique<SLSSenderQueueItem>( std::move(serializedData), packageSize, this, mQueueKey, mLogstore, RawDataType::EVENT_GROUP_LIST)) && allSucceeded; } return allSucceeded; } bool FlusherSLS::SerializeAndPush(vector<BatchedEventsList>&& groupLists) { bool allSucceeded = true; for (auto& groupList : groupLists) { allSucceeded = SerializeAndPush(std::move(groupList)) && allSucceeded; } return allSucceeded; } bool FlusherSLS::PushToQueue(QueueKey key, unique_ptr<SenderQueueItem>&& item, uint32_t retryTimes) { const string& str = QueueKeyManager::GetInstance()->GetName(key); for (size_t i = 0; i < retryTimes; ++i) { int rst = SenderQueueManager::GetInstance()->PushQueue(key, std::move(item)); if (rst == 0) { return true; } if (rst == 2) { // should not happen LOG_ERROR(sLogger, ("failed to push data to sender queue", "queue not found")("action", "discard data")("config-flusher-dst", str)); AlarmManager::GetInstance()->SendAlarm( DISCARD_DATA_ALARM, "failed to push data to sender queue: queue not found\taction: discard data\tconfig-flusher-dst" + str); return false; } if (i % 100 == 0) { LOG_WARNING(sLogger, ("push attempts to sender queue continuously failed for the past second", "retry again")("config-flusher-dst", str)); } this_thread::sleep_for(chrono::milliseconds(10)); } LOG_WARNING( sLogger, ("failed to push data to sender queue", "queue full")("action", "discard data")("config-flusher-dst", str)); AlarmManager::GetInstance()->SendAlarm( DISCARD_DATA_ALARM, "failed to push data to sender queue: queue full\taction: discard data\tconfig-flusher-dst" + str); return false; } string FlusherSLS::GetShardHashKey(const BatchedEvents& g) const { // TODO: improve performance string key; for (size_t i = 0; i < mShardHashKeys.size(); ++i) { for (auto& item : g.mTags.mInner) { if (item.first == mShardHashKeys[i]) { key += item.second.to_string(); break; } } if (i != mShardHashKeys.size() - 1) { key += "_"; } } return CalcMD5(key); } void FlusherSLS::AddPackId(BatchedEvents& g) const { string packIdPrefixStr = g.mPackIdPrefix.to_string(); int64_t packidPrefix = HashString(packIdPrefixStr); int64_t packSeq = PackIdManager::GetInstance()->GetAndIncPackSeq( HashString(packIdPrefixStr + "_" + mProject + "_" + mLogstore)); auto packId = g.mSourceBuffers[0]->CopyString(ToHexString(packidPrefix) + "-" + ToHexString(packSeq)); g.mTags.Insert(LOG_RESERVED_KEY_PACKAGE_ID, StringView(packId.data, packId.size)); } unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostLogStoreLogsRequest(const string& accessKeyId, const string& accessKeySecret, SLSClientManager::AuthType type, SLSSenderQueueItem* item) const { optional<uint64_t> seqId; if (item->mExactlyOnceCheckpoint) { seqId = item->mExactlyOnceCheckpoint->data.sequence_id(); } string path, query; map<string, string> header; PreparePostLogStoreLogsRequest(accessKeyId, accessKeySecret, type, item->mCurrentHost, item->mRealIpFlag, mProject, item->mLogstore, CompressTypeToString(mCompressor->GetCompressType()), item->mType, item->mData, item->mRawSize, item->mShardHashKey, seqId, path, query, header); bool httpsFlag = SLSClientManager::GetInstance()->UsingHttps(mRegion); return make_unique<HttpSinkRequest>(HTTP_POST, httpsFlag, item->mCurrentHost, httpsFlag ? 443 : 80, path, query, header, item->mData, item, INT32_FLAG(default_http_request_timeout_sec), 1, CurlSocket(INT32_FLAG(sls_request_dscp))); } unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostMetricStoreLogsRequest(const string& accessKeyId, const string& accessKeySecret, SLSClientManager::AuthType type, SLSSenderQueueItem* item) const { string path; map<string, string> header; PreparePostMetricStoreLogsRequest(accessKeyId, accessKeySecret, type, item->mCurrentHost, item->mRealIpFlag, mProject, item->mLogstore, CompressTypeToString(mCompressor->GetCompressType()), item->mData, item->mRawSize, path, header); bool httpsFlag = SLSClientManager::GetInstance()->UsingHttps(mRegion); return make_unique<HttpSinkRequest>(HTTP_POST, httpsFlag, item->mCurrentHost, httpsFlag ? 443 : 80, path, "", header, item->mData, item, INT32_FLAG(default_http_request_timeout_sec), 1, CurlSocket(INT32_FLAG(sls_request_dscp))); } unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostAPMBackendRequest(const string& accessKeyId, const string& accessKeySecret, SLSClientManager::AuthType type, SLSSenderQueueItem* item, const std::string& subPath) const { string query; map<string, string> header; PreparePostAPMBackendRequest(accessKeyId, accessKeySecret, type, item->mCurrentHost, item->mRealIpFlag, mProject, item->mLogstore, CompressTypeToString(mCompressor->GetCompressType()), item->mType, item->mData, item->mRawSize, mSubpath, query, header); bool httpsFlag = SLSClientManager::GetInstance()->UsingHttps(mRegion); return make_unique<HttpSinkRequest>(HTTP_POST, httpsFlag, item->mCurrentHost, httpsFlag ? 443 : 80, subPath, "", header, item->mData, item, INT32_FLAG(default_http_request_timeout_sec), 1, CurlSocket(INT32_FLAG(sls_request_dscp))); } sls_logs::SlsCompressType ConvertCompressType(CompressType type) { sls_logs::SlsCompressType compressType = sls_logs::SLS_CMP_NONE; switch (type) { case CompressType::LZ4: compressType = sls_logs::SLS_CMP_LZ4; break; case CompressType::ZSTD: compressType = sls_logs::SLS_CMP_ZSTD; break; default: break; } return compressType; } } // namespace logtail