void FlusherSLS::OnSendDone()

in core/plugin/flusher/sls/FlusherSLS.cpp [689:905]


void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) {
    ADD_COUNTER(mSendDoneCnt, 1);
    SLSResponse slsResponse = ParseHttpResponse(response);

    auto data = static_cast<SLSSenderQueueItem*>(item);
    string configName = HasContext() ? GetContext().GetConfigName() : "";
    string hostname = data->mCurrentHost;
    bool isProfileData = GetProfileSender()->IsProfileData(mRegion, mProject, data->mLogstore);
    int32_t curTime = time(NULL);
    auto curSystemTime = chrono::system_clock::now();
    SendResult sendResult = SEND_OK;
    if (slsResponse.mStatusCode == 200) {
        auto& cpt = data->mExactlyOnceCheckpoint;
        if (cpt) {
            cpt->Commit();
            cpt->IncreaseSequenceID();
        }
        LOG_DEBUG(
            sLogger,
            ("send data to sls succeeded, item address", item)("request id", slsResponse.mRequestId)(
                "config", configName)("region", mRegion)("project", mProject)("logstore", data->mLogstore)(
                "response time",
                ToString(chrono::duration_cast<chrono::milliseconds>(curSystemTime - item->mLastSendTime).count())
                    + "ms")(
                "total send time",
                ToString(chrono::duration_cast<chrono::milliseconds>(curSystemTime - item->mFirstEnqueTime).count())
                    + "ms")("try cnt", data->mTryCnt)("endpoint", data->mCurrentHost)("is profile data",
                                                                                      isProfileData));
        GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime);
        GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime);
        GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime);
        SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey);
        ADD_COUNTER(mSuccessCnt, 1);
        DealSenderQueueItemAfterSend(item, false);
    } else {
        OperationOnFail operation;
        sendResult = ConvertErrorCode(slsResponse.mErrorCode);
        ostringstream failDetail, suggestion;
        if (sendResult == SEND_NETWORK_ERROR || sendResult == SEND_SERVER_ERROR) {
            if (sendResult == SEND_NETWORK_ERROR) {
                failDetail << "network error";
                ADD_COUNTER(mNetworkErrorCnt, 1);
            } else {
                failDetail << "server error";
                ADD_COUNTER(mServerErrorCnt, 1);
            }
            suggestion << "check network connection to endpoint";
#ifdef __ENTERPRISE__
            if (data->mRealIpFlag) {
                // connect refused, use vip directly
                failDetail << ", real ip may be stale, force update";
                EnterpriseSLSClientManager::GetInstance()->UpdateOutdatedRealIpRegions(mRegion);
            }
#endif
            operation = data->mBufferOrNot ? OperationOnFail::RETRY_LATER : OperationOnFail::DISCARD;
            GetRegionConcurrencyLimiter(mRegion)->OnFail(curSystemTime);
            GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime);
            GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime);
        } else if (sendResult == SEND_QUOTA_EXCEED) {
            if (slsResponse.mErrorCode == LOGE_SHARD_WRITE_QUOTA_EXCEED) {
                failDetail << "shard write quota exceed";
                suggestion << "Split logstore shards. https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources";
                GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnFail(curSystemTime);
                GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime);
                GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime);
                ADD_COUNTER(mShardWriteQuotaErrorCnt, 1);
            } else {
                failDetail << "project write quota exceed";
                suggestion << "Submit quota modification request. "
                              "https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources";
                GetProjectConcurrencyLimiter(mProject)->OnFail(curSystemTime);
                GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime);
                GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime);
                ADD_COUNTER(mProjectQuotaErrorCnt, 1);
            }
            AlarmManager::GetInstance()->SendAlarm(SEND_QUOTA_EXCEED_ALARM,
                                                   "error_code: " + slsResponse.mErrorCode
                                                       + ", error_message: " + slsResponse.mErrorMsg
                                                       + ", request_id:" + slsResponse.mRequestId,
                                                   mRegion,
                                                   mProject,
                                                   mContext ? mContext->GetConfigName() : "",
                                                   data->mLogstore);
            operation = OperationOnFail::RETRY_LATER;
        } else if (sendResult == SEND_UNAUTHORIZED) {
            failDetail << "write unauthorized";
            suggestion << "check access keys provided";
            operation = OperationOnFail::RETRY_LATER;
            ADD_COUNTER(mUnauthErrorCnt, 1);
        } else if (sendResult == SEND_PARAMETER_INVALID) {
            failDetail << "invalid parameters";
            suggestion << "check input parameters";
            operation = DefaultOperation(item->mTryCnt);
            ADD_COUNTER(mParamsErrorCnt, 1);
        } else if (sendResult == SEND_INVALID_SEQUENCE_ID) {
            failDetail << "invalid exactly-once sequence id";
            ADD_COUNTER(mSequenceIDErrorCnt, 1);
            do {
                auto& cpt = data->mExactlyOnceCheckpoint;
                if (!cpt) {
                    failDetail << ", unexpected result when exactly once checkpoint is not found";
                    suggestion << "report bug";
                    AlarmManager::GetInstance()->SendAlarm(
                        EXACTLY_ONCE_ALARM,
                        "drop exactly once log group because of invalid sequence ID, request id:"
                            + slsResponse.mRequestId,
                        mRegion,
                        mProject,
                        mContext ? mContext->GetConfigName() : "",
                        data->mLogstore);
                    operation = OperationOnFail::DISCARD;
                    break;
                }

                // Because hash key is generated by UUID library, we consider that
                //  the possibility of hash key conflict is very low, so data is
                //  dropped here.
                cpt->Commit();
                failDetail << ", drop exactly once log group and commit checkpoint"
                           << " checkpointKey:" << cpt->key << " checkpoint:" << cpt->data.DebugString();
                suggestion << "no suggestion";
                AlarmManager::GetInstance()->SendAlarm(
                    EXACTLY_ONCE_ALARM,
                    "drop exactly once log group because of invalid sequence ID, cpt:" + cpt->key
                        + ", data:" + cpt->data.DebugString() + "request id:" + slsResponse.mRequestId,
                    mRegion,
                    mProject,
                    mContext ? mContext->GetConfigName() : "",
                    data->mLogstore);
                operation = OperationOnFail::DISCARD;
                cpt->IncreaseSequenceID();
            } while (0);
        } else if (AppConfig::GetInstance()->EnableLogTimeAutoAdjust()
                   && LOGE_REQUEST_TIME_EXPIRED == slsResponse.mErrorCode) {
            failDetail << "write request expired, will retry";
            suggestion << "check local system time";
            operation = OperationOnFail::RETRY_IMMEDIATELY;
            ADD_COUNTER(mRequestExpiredErrorCnt, 1);
        } else {
            failDetail << "other error";
            suggestion << "no suggestion";
            // when unknown error such as SignatureNotMatch happens, we should retry several times
            // first time, we will retry immediately
            // then we record error and retry latter
            // when retry times > unknow_error_try_max, we will drop this data
            operation = DefaultOperation(item->mTryCnt);
            ADD_COUNTER(mOtherErrorCnt, 1);
        }
        if (chrono::duration_cast<chrono::seconds>(curSystemTime - item->mFirstEnqueTime).count()
            > INT32_FLAG(discard_send_fail_interval)) {
            operation = OperationOnFail::DISCARD;
        }
        if (isProfileData && data->mTryCnt >= static_cast<uint32_t>(INT32_FLAG(profile_data_send_retrytimes))) {
            operation = OperationOnFail::DISCARD;
        }

#define LOG_PATTERN \
    ("failed to send request", failDetail.str())("operation", GetOperationString(operation))("suggestion", \
                                                                                             suggestion.str())( \
        "item address", item)("request id", slsResponse.mRequestId)("status code", slsResponse.mStatusCode)( \
        "error code", slsResponse.mErrorCode)("errMsg", slsResponse.mErrorMsg)("config", configName)( \
        "region", mRegion)("project", mProject)("logstore", data->mLogstore)("try cnt", data->mTryCnt)( \
        "response time", \
        ToString(chrono::duration_cast<chrono::seconds>(curSystemTime - data->mLastSendTime).count()) \
            + "ms")("total send time", \
                    ToString(chrono::duration_cast<chrono::seconds>(curSystemTime - data->mFirstEnqueTime).count()) \
                        + "ms")("endpoint", data->mCurrentHost)("is profile data", isProfileData)

        switch (operation) {
            case OperationOnFail::RETRY_IMMEDIATELY:
                ++item->mTryCnt;
                FlusherRunner::GetInstance()->PushToHttpSink(item, false);
                break;
            case OperationOnFail::RETRY_LATER:
                if (slsResponse.mErrorCode == LOGE_REQUEST_TIMEOUT
                    || curTime - data->mLastLogWarningTime > ON_FAIL_LOG_WARNING_INTERVAL_SECOND) {
                    LOG_WARNING(sLogger, LOG_PATTERN);
                    data->mLastLogWarningTime = curTime;
                }
                SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey);
                DealSenderQueueItemAfterSend(item, true);
                break;
            case OperationOnFail::DISCARD:
                ADD_COUNTER(mDiscardCnt, 1);
            default:
                LOG_WARNING(sLogger, LOG_PATTERN);
                if (!isProfileData) {
                    AlarmManager::GetInstance()->SendAlarm(
                        SEND_DATA_FAIL_ALARM,
                        "failed to send request: " + failDetail.str() + "\toperation: " + GetOperationString(operation)
                            + "\trequestId: " + slsResponse.mRequestId
                            + "\tstatusCode: " + ToString(slsResponse.mStatusCode)
                            + "\terrorCode: " + slsResponse.mErrorCode + "\terrorMessage: " + slsResponse.mErrorMsg
                            + "\tconfig: " + configName + "\tendpoint: " + data->mCurrentHost,
                        mRegion,
                        mProject,
                        mContext ? mContext->GetConfigName() : "",
                        data->mLogstore);
                }
                SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey);
                DealSenderQueueItemAfterSend(item, false);
                break;
        }
    }
#ifdef __ENTERPRISE__
    bool hasNetworkError = sendResult == SEND_NETWORK_ERROR;
    EnterpriseSLSClientManager::GetInstance()->UpdateHostStatus(
        mProject, mCandidateHostsInfo->GetMode(), hostname, !hasNetworkError);
    mCandidateHostsInfo->SelectBestHost();

    if (!hasNetworkError) {
        bool hasAuthError = sendResult == SEND_UNAUTHORIZED;
        EnterpriseSLSClientManager::GetInstance()->UpdateAccessKeyStatus(mAliuid, !hasAuthError);
        EnterpriseSLSClientManager::GetInstance()->UpdateProjectAnonymousWriteStatus(mProject, !hasAuthError);
    }
#endif
}