in core/plugin/flusher/sls/FlusherSLS.cpp [689:905]
void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) {
ADD_COUNTER(mSendDoneCnt, 1);
SLSResponse slsResponse = ParseHttpResponse(response);
auto data = static_cast<SLSSenderQueueItem*>(item);
string configName = HasContext() ? GetContext().GetConfigName() : "";
string hostname = data->mCurrentHost;
bool isProfileData = GetProfileSender()->IsProfileData(mRegion, mProject, data->mLogstore);
int32_t curTime = time(NULL);
auto curSystemTime = chrono::system_clock::now();
SendResult sendResult = SEND_OK;
if (slsResponse.mStatusCode == 200) {
auto& cpt = data->mExactlyOnceCheckpoint;
if (cpt) {
cpt->Commit();
cpt->IncreaseSequenceID();
}
LOG_DEBUG(
sLogger,
("send data to sls succeeded, item address", item)("request id", slsResponse.mRequestId)(
"config", configName)("region", mRegion)("project", mProject)("logstore", data->mLogstore)(
"response time",
ToString(chrono::duration_cast<chrono::milliseconds>(curSystemTime - item->mLastSendTime).count())
+ "ms")(
"total send time",
ToString(chrono::duration_cast<chrono::milliseconds>(curSystemTime - item->mFirstEnqueTime).count())
+ "ms")("try cnt", data->mTryCnt)("endpoint", data->mCurrentHost)("is profile data",
isProfileData));
GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime);
GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime);
GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime);
SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey);
ADD_COUNTER(mSuccessCnt, 1);
DealSenderQueueItemAfterSend(item, false);
} else {
OperationOnFail operation;
sendResult = ConvertErrorCode(slsResponse.mErrorCode);
ostringstream failDetail, suggestion;
if (sendResult == SEND_NETWORK_ERROR || sendResult == SEND_SERVER_ERROR) {
if (sendResult == SEND_NETWORK_ERROR) {
failDetail << "network error";
ADD_COUNTER(mNetworkErrorCnt, 1);
} else {
failDetail << "server error";
ADD_COUNTER(mServerErrorCnt, 1);
}
suggestion << "check network connection to endpoint";
#ifdef __ENTERPRISE__
if (data->mRealIpFlag) {
// connect refused, use vip directly
failDetail << ", real ip may be stale, force update";
EnterpriseSLSClientManager::GetInstance()->UpdateOutdatedRealIpRegions(mRegion);
}
#endif
operation = data->mBufferOrNot ? OperationOnFail::RETRY_LATER : OperationOnFail::DISCARD;
GetRegionConcurrencyLimiter(mRegion)->OnFail(curSystemTime);
GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime);
GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime);
} else if (sendResult == SEND_QUOTA_EXCEED) {
if (slsResponse.mErrorCode == LOGE_SHARD_WRITE_QUOTA_EXCEED) {
failDetail << "shard write quota exceed";
suggestion << "Split logstore shards. https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources";
GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnFail(curSystemTime);
GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime);
GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime);
ADD_COUNTER(mShardWriteQuotaErrorCnt, 1);
} else {
failDetail << "project write quota exceed";
suggestion << "Submit quota modification request. "
"https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources";
GetProjectConcurrencyLimiter(mProject)->OnFail(curSystemTime);
GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime);
GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime);
ADD_COUNTER(mProjectQuotaErrorCnt, 1);
}
AlarmManager::GetInstance()->SendAlarm(SEND_QUOTA_EXCEED_ALARM,
"error_code: " + slsResponse.mErrorCode
+ ", error_message: " + slsResponse.mErrorMsg
+ ", request_id:" + slsResponse.mRequestId,
mRegion,
mProject,
mContext ? mContext->GetConfigName() : "",
data->mLogstore);
operation = OperationOnFail::RETRY_LATER;
} else if (sendResult == SEND_UNAUTHORIZED) {
failDetail << "write unauthorized";
suggestion << "check access keys provided";
operation = OperationOnFail::RETRY_LATER;
ADD_COUNTER(mUnauthErrorCnt, 1);
} else if (sendResult == SEND_PARAMETER_INVALID) {
failDetail << "invalid parameters";
suggestion << "check input parameters";
operation = DefaultOperation(item->mTryCnt);
ADD_COUNTER(mParamsErrorCnt, 1);
} else if (sendResult == SEND_INVALID_SEQUENCE_ID) {
failDetail << "invalid exactly-once sequence id";
ADD_COUNTER(mSequenceIDErrorCnt, 1);
do {
auto& cpt = data->mExactlyOnceCheckpoint;
if (!cpt) {
failDetail << ", unexpected result when exactly once checkpoint is not found";
suggestion << "report bug";
AlarmManager::GetInstance()->SendAlarm(
EXACTLY_ONCE_ALARM,
"drop exactly once log group because of invalid sequence ID, request id:"
+ slsResponse.mRequestId,
mRegion,
mProject,
mContext ? mContext->GetConfigName() : "",
data->mLogstore);
operation = OperationOnFail::DISCARD;
break;
}
// Because hash key is generated by UUID library, we consider that
// the possibility of hash key conflict is very low, so data is
// dropped here.
cpt->Commit();
failDetail << ", drop exactly once log group and commit checkpoint"
<< " checkpointKey:" << cpt->key << " checkpoint:" << cpt->data.DebugString();
suggestion << "no suggestion";
AlarmManager::GetInstance()->SendAlarm(
EXACTLY_ONCE_ALARM,
"drop exactly once log group because of invalid sequence ID, cpt:" + cpt->key
+ ", data:" + cpt->data.DebugString() + "request id:" + slsResponse.mRequestId,
mRegion,
mProject,
mContext ? mContext->GetConfigName() : "",
data->mLogstore);
operation = OperationOnFail::DISCARD;
cpt->IncreaseSequenceID();
} while (0);
} else if (AppConfig::GetInstance()->EnableLogTimeAutoAdjust()
&& LOGE_REQUEST_TIME_EXPIRED == slsResponse.mErrorCode) {
failDetail << "write request expired, will retry";
suggestion << "check local system time";
operation = OperationOnFail::RETRY_IMMEDIATELY;
ADD_COUNTER(mRequestExpiredErrorCnt, 1);
} else {
failDetail << "other error";
suggestion << "no suggestion";
// when unknown error such as SignatureNotMatch happens, we should retry several times
// first time, we will retry immediately
// then we record error and retry latter
// when retry times > unknow_error_try_max, we will drop this data
operation = DefaultOperation(item->mTryCnt);
ADD_COUNTER(mOtherErrorCnt, 1);
}
if (chrono::duration_cast<chrono::seconds>(curSystemTime - item->mFirstEnqueTime).count()
> INT32_FLAG(discard_send_fail_interval)) {
operation = OperationOnFail::DISCARD;
}
if (isProfileData && data->mTryCnt >= static_cast<uint32_t>(INT32_FLAG(profile_data_send_retrytimes))) {
operation = OperationOnFail::DISCARD;
}
#define LOG_PATTERN \
("failed to send request", failDetail.str())("operation", GetOperationString(operation))("suggestion", \
suggestion.str())( \
"item address", item)("request id", slsResponse.mRequestId)("status code", slsResponse.mStatusCode)( \
"error code", slsResponse.mErrorCode)("errMsg", slsResponse.mErrorMsg)("config", configName)( \
"region", mRegion)("project", mProject)("logstore", data->mLogstore)("try cnt", data->mTryCnt)( \
"response time", \
ToString(chrono::duration_cast<chrono::seconds>(curSystemTime - data->mLastSendTime).count()) \
+ "ms")("total send time", \
ToString(chrono::duration_cast<chrono::seconds>(curSystemTime - data->mFirstEnqueTime).count()) \
+ "ms")("endpoint", data->mCurrentHost)("is profile data", isProfileData)
switch (operation) {
case OperationOnFail::RETRY_IMMEDIATELY:
++item->mTryCnt;
FlusherRunner::GetInstance()->PushToHttpSink(item, false);
break;
case OperationOnFail::RETRY_LATER:
if (slsResponse.mErrorCode == LOGE_REQUEST_TIMEOUT
|| curTime - data->mLastLogWarningTime > ON_FAIL_LOG_WARNING_INTERVAL_SECOND) {
LOG_WARNING(sLogger, LOG_PATTERN);
data->mLastLogWarningTime = curTime;
}
SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey);
DealSenderQueueItemAfterSend(item, true);
break;
case OperationOnFail::DISCARD:
ADD_COUNTER(mDiscardCnt, 1);
default:
LOG_WARNING(sLogger, LOG_PATTERN);
if (!isProfileData) {
AlarmManager::GetInstance()->SendAlarm(
SEND_DATA_FAIL_ALARM,
"failed to send request: " + failDetail.str() + "\toperation: " + GetOperationString(operation)
+ "\trequestId: " + slsResponse.mRequestId
+ "\tstatusCode: " + ToString(slsResponse.mStatusCode)
+ "\terrorCode: " + slsResponse.mErrorCode + "\terrorMessage: " + slsResponse.mErrorMsg
+ "\tconfig: " + configName + "\tendpoint: " + data->mCurrentHost,
mRegion,
mProject,
mContext ? mContext->GetConfigName() : "",
data->mLogstore);
}
SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey);
DealSenderQueueItemAfterSend(item, false);
break;
}
}
#ifdef __ENTERPRISE__
bool hasNetworkError = sendResult == SEND_NETWORK_ERROR;
EnterpriseSLSClientManager::GetInstance()->UpdateHostStatus(
mProject, mCandidateHostsInfo->GetMode(), hostname, !hasNetworkError);
mCandidateHostsInfo->SelectBestHost();
if (!hasNetworkError) {
bool hasAuthError = sendResult == SEND_UNAUTHORIZED;
EnterpriseSLSClientManager::GetInstance()->UpdateAccessKeyStatus(mAliuid, !hasAuthError);
EnterpriseSLSClientManager::GetInstance()->UpdateProjectAnonymousWriteStatus(mProject, !hasAuthError);
}
#endif
}