void DiskBufferWriter::SendEncryptionBuffer()

in core/plugin/flusher/sls/DiskBufferWriter.cpp [483:665]


void DiskBufferWriter::SendEncryptionBuffer(const std::string& filename, int32_t keyVersion) {
    string encryption;
    string logData;
    EncryptionStateMeta meta;
    bool readResult;
    bool writeBack = false;
    int32_t pos = INT32_FLAG(file_encryption_header_length);
    sls_logs::LogtailBufferMeta bufferMeta;
    int32_t discardCount = 0;
    while (ReadNextEncryption(pos, filename, encryption, meta, readResult, bufferMeta)) {
        logData.clear();
        bool sendResult = false;
        if (!readResult || !CheckBufferMetaValidation(filename, bufferMeta)) {
            if (meta.mHandled == 1)
                continue;
            sendResult = true;
            discardCount++;
        }
        if (!sendResult) {
            char* des = new char[meta.mLogDataSize];
            if (!FileEncryption::GetInstance()->Decrypt(
                    encryption.c_str(), meta.mEncryptionSize, des, meta.mLogDataSize, keyVersion)) {
                sendResult = true;
                discardCount++;
                LOG_ERROR(sLogger,
                          ("decrypt error, project_name",
                           bufferMeta.project())("key_version", keyVersion)("meta.mLogDataSize", meta.mLogDataSize));
                AlarmManager::GetInstance()->SendAlarm(ENCRYPT_DECRYPT_FAIL_ALARM,
                                                       string("decrypt error, project_name:" + bufferMeta.project()
                                                              + ", key_version:" + ToString(keyVersion)
                                                              + ", meta.mLogDataSize:" + ToString(meta.mLogDataSize)),
                                                       bufferMeta.region(),
                                                       bufferMeta.project(),
                                                       "",
                                                       bufferMeta.logstore());
            } else {
                if (bufferMeta.has_logstore())
                    logData = string(des, meta.mLogDataSize);
                else {
                    // compatible to old buffer file (logGroup string), convert to LZ4 compressed
                    string logGroupStr = string(des, meta.mLogDataSize);
                    sls_logs::LogGroup logGroup;
                    if (!logGroup.ParseFromString(logGroupStr)) {
                        sendResult = true;
                        LOG_ERROR(sLogger,
                                  ("parse error from string to loggroup, projectName is", bufferMeta.project()));
                        discardCount++;
                        AlarmManager::GetInstance()->SendAlarm(
                            LOG_GROUP_PARSE_FAIL_ALARM,
                            string("projectName is:" + bufferMeta.project() + ", fileName is:" + filename),
                            bufferMeta.region(),
                            bufferMeta.project(),
                            "",
                            bufferMeta.logstore());
                    } else if (!CompressLz4(logGroupStr, logData)) {
                        sendResult = true;
                        LOG_ERROR(sLogger, ("LZ4 compress loggroup fail, projectName is", bufferMeta.project()));
                        discardCount++;
                        AlarmManager::GetInstance()->SendAlarm(
                            SEND_COMPRESS_FAIL_ALARM,
                            string("projectName is:" + bufferMeta.project() + ", fileName is:" + filename),
                            bufferMeta.region(),
                            bufferMeta.project(),
                            "",
                            bufferMeta.logstore());
                    } else {
                        bufferMeta.set_logstore(logGroup.category());
                        bufferMeta.set_datatype(int(RawDataType::EVENT_GROUP));
                        bufferMeta.set_rawsize(meta.mLogDataSize);
                        bufferMeta.set_compresstype(sls_logs::SLS_CMP_LZ4);
                        bufferMeta.set_telemetrytype(sls_logs::SLS_TELEMETRY_TYPE_LOGS);
                    }
                }
                if (!sendResult) {
                    time_t beginTime = time(nullptr);
                    while (true) {
                        string host;
                        auto response = SendBufferFileData(bufferMeta, logData, host);
                        SendResult sendRes = SEND_OK;
                        if (response.mStatusCode != 200) {
                            sendRes = ConvertErrorCode(response.mErrorCode);
                        }
                        switch (sendRes) {
                            case SEND_OK:
                                sendResult = true;
                                break;
                            case SEND_NETWORK_ERROR:
                            case SEND_SERVER_ERROR:
                                if (response.mErrorMsg != kNoHostErrorMsg) {
                                    LOG_WARNING(
                                        sLogger,
                                        ("send data to SLS fail", "retry later")("request id", response.mRequestId)(
                                            "error_code", response.mErrorCode)("error_message", response.mErrorMsg)(
                                            "endpoint", host)("projectName", bufferMeta.project())(
                                            "logstore", bufferMeta.logstore())("rawsize", bufferMeta.rawsize()));
                                }
                                usleep(INT32_FLAG(send_retry_sleep_interval));
                                break;
                            case SEND_QUOTA_EXCEED:
                                AlarmManager::GetInstance()->SendAlarm(SEND_QUOTA_EXCEED_ALARM,
                                                                       "error_code: " + response.mErrorCode
                                                                           + ", error_message: " + response.mErrorMsg,
                                                                       bufferMeta.region(),
                                                                       bufferMeta.project(),
                                                                       "",
                                                                       bufferMeta.logstore());
                                // no region
                                if (!GetProfileSender()->IsProfileData("", bufferMeta.project(), bufferMeta.logstore()))
                                    LOG_WARNING(
                                        sLogger,
                                        ("send data to SLS fail", "retry later")("request id", response.mRequestId)(
                                            "error_code", response.mErrorCode)("error_message", response.mErrorMsg)(
                                            "endpoint", host)("projectName", bufferMeta.project())(
                                            "logstore", bufferMeta.logstore())("rawsize", bufferMeta.rawsize()));
                                usleep(INT32_FLAG(quota_exceed_wait_interval));
                                break;
                            case SEND_UNAUTHORIZED:
                                usleep(INT32_FLAG(unauthorized_wait_interval));
                                break;
                            default:
                                sendResult = true;
                                discardCount++;
                                break;
                        }
#ifdef __ENTERPRISE__
                        if (sendRes != SEND_NETWORK_ERROR && sendRes != SEND_SERVER_ERROR) {
                            bool hasAuthError = sendRes == SEND_UNAUTHORIZED && response.mErrorMsg != kAKErrorMsg;
                            EnterpriseSLSClientManager::GetInstance()->UpdateAccessKeyStatus(bufferMeta.aliuid(),
                                                                                             !hasAuthError);
                            EnterpriseSLSClientManager::GetInstance()->UpdateProjectAnonymousWriteStatus(
                                bufferMeta.project(), !hasAuthError);
                        }
#endif
                        if (time(nullptr) - beginTime >= INT32_FLAG(discard_send_fail_interval)) {
                            sendResult = true;
                            discardCount++;
                        }
                        if (sendResult) {
                            break;
                        }
                        {
                            lock_guard<mutex> lock(mBufferSenderThreadRunningMux);
                            if (!mIsSendBufferThreadRunning) {
                                break;
                            }
                        }
                    }
                }
            }
            delete[] des;
        }
        if (sendResult)
            meta.mHandled = 1;
        LOG_DEBUG(sLogger,
                  ("send LogGroup from local buffer file", filename)("rawsize", bufferMeta.rawsize())("sendResult",
                                                                                                      sendResult));
        WriteBackMeta(pos - meta.mEncryptionSize - sizeof(meta)
                          - (meta.mEncodedInfoSize > BUFFER_META_BASE_SIZE
                                 ? (meta.mEncodedInfoSize - BUFFER_META_BASE_SIZE)
                                 : meta.mEncodedInfoSize),
                      (char*)&meta,
                      sizeof(meta),
                      filename);
        if (!sendResult)
            writeBack = true;
        {
            lock_guard<mutex> lock(mBufferSenderThreadRunningMux);
            if (!mIsSendBufferThreadRunning) {
                return;
            }
        }
    }
    if (!writeBack) {
        remove(filename.c_str());
        if (discardCount > 0) {
            LOG_ERROR(sLogger, ("send buffer file, discard LogGroup count", discardCount)("delete file", filename));
            AlarmManager::GetInstance()->SendAlarm(DISCARD_SECONDARY_ALARM,
                                                   "delete buffer file: " + filename + ", discard "
                                                       + ToString(discardCount) + " logGroups");
        } else
            LOG_INFO(sLogger, ("send buffer file success, delete buffer file", filename));
    }
}