in core/plugin/flusher/sls/DiskBufferWriter.cpp [483:665]
void DiskBufferWriter::SendEncryptionBuffer(const std::string& filename, int32_t keyVersion) {
string encryption;
string logData;
EncryptionStateMeta meta;
bool readResult;
bool writeBack = false;
int32_t pos = INT32_FLAG(file_encryption_header_length);
sls_logs::LogtailBufferMeta bufferMeta;
int32_t discardCount = 0;
while (ReadNextEncryption(pos, filename, encryption, meta, readResult, bufferMeta)) {
logData.clear();
bool sendResult = false;
if (!readResult || !CheckBufferMetaValidation(filename, bufferMeta)) {
if (meta.mHandled == 1)
continue;
sendResult = true;
discardCount++;
}
if (!sendResult) {
char* des = new char[meta.mLogDataSize];
if (!FileEncryption::GetInstance()->Decrypt(
encryption.c_str(), meta.mEncryptionSize, des, meta.mLogDataSize, keyVersion)) {
sendResult = true;
discardCount++;
LOG_ERROR(sLogger,
("decrypt error, project_name",
bufferMeta.project())("key_version", keyVersion)("meta.mLogDataSize", meta.mLogDataSize));
AlarmManager::GetInstance()->SendAlarm(ENCRYPT_DECRYPT_FAIL_ALARM,
string("decrypt error, project_name:" + bufferMeta.project()
+ ", key_version:" + ToString(keyVersion)
+ ", meta.mLogDataSize:" + ToString(meta.mLogDataSize)),
bufferMeta.region(),
bufferMeta.project(),
"",
bufferMeta.logstore());
} else {
if (bufferMeta.has_logstore())
logData = string(des, meta.mLogDataSize);
else {
// compatible to old buffer file (logGroup string), convert to LZ4 compressed
string logGroupStr = string(des, meta.mLogDataSize);
sls_logs::LogGroup logGroup;
if (!logGroup.ParseFromString(logGroupStr)) {
sendResult = true;
LOG_ERROR(sLogger,
("parse error from string to loggroup, projectName is", bufferMeta.project()));
discardCount++;
AlarmManager::GetInstance()->SendAlarm(
LOG_GROUP_PARSE_FAIL_ALARM,
string("projectName is:" + bufferMeta.project() + ", fileName is:" + filename),
bufferMeta.region(),
bufferMeta.project(),
"",
bufferMeta.logstore());
} else if (!CompressLz4(logGroupStr, logData)) {
sendResult = true;
LOG_ERROR(sLogger, ("LZ4 compress loggroup fail, projectName is", bufferMeta.project()));
discardCount++;
AlarmManager::GetInstance()->SendAlarm(
SEND_COMPRESS_FAIL_ALARM,
string("projectName is:" + bufferMeta.project() + ", fileName is:" + filename),
bufferMeta.region(),
bufferMeta.project(),
"",
bufferMeta.logstore());
} else {
bufferMeta.set_logstore(logGroup.category());
bufferMeta.set_datatype(int(RawDataType::EVENT_GROUP));
bufferMeta.set_rawsize(meta.mLogDataSize);
bufferMeta.set_compresstype(sls_logs::SLS_CMP_LZ4);
bufferMeta.set_telemetrytype(sls_logs::SLS_TELEMETRY_TYPE_LOGS);
}
}
if (!sendResult) {
time_t beginTime = time(nullptr);
while (true) {
string host;
auto response = SendBufferFileData(bufferMeta, logData, host);
SendResult sendRes = SEND_OK;
if (response.mStatusCode != 200) {
sendRes = ConvertErrorCode(response.mErrorCode);
}
switch (sendRes) {
case SEND_OK:
sendResult = true;
break;
case SEND_NETWORK_ERROR:
case SEND_SERVER_ERROR:
if (response.mErrorMsg != kNoHostErrorMsg) {
LOG_WARNING(
sLogger,
("send data to SLS fail", "retry later")("request id", response.mRequestId)(
"error_code", response.mErrorCode)("error_message", response.mErrorMsg)(
"endpoint", host)("projectName", bufferMeta.project())(
"logstore", bufferMeta.logstore())("rawsize", bufferMeta.rawsize()));
}
usleep(INT32_FLAG(send_retry_sleep_interval));
break;
case SEND_QUOTA_EXCEED:
AlarmManager::GetInstance()->SendAlarm(SEND_QUOTA_EXCEED_ALARM,
"error_code: " + response.mErrorCode
+ ", error_message: " + response.mErrorMsg,
bufferMeta.region(),
bufferMeta.project(),
"",
bufferMeta.logstore());
// no region
if (!GetProfileSender()->IsProfileData("", bufferMeta.project(), bufferMeta.logstore()))
LOG_WARNING(
sLogger,
("send data to SLS fail", "retry later")("request id", response.mRequestId)(
"error_code", response.mErrorCode)("error_message", response.mErrorMsg)(
"endpoint", host)("projectName", bufferMeta.project())(
"logstore", bufferMeta.logstore())("rawsize", bufferMeta.rawsize()));
usleep(INT32_FLAG(quota_exceed_wait_interval));
break;
case SEND_UNAUTHORIZED:
usleep(INT32_FLAG(unauthorized_wait_interval));
break;
default:
sendResult = true;
discardCount++;
break;
}
#ifdef __ENTERPRISE__
if (sendRes != SEND_NETWORK_ERROR && sendRes != SEND_SERVER_ERROR) {
bool hasAuthError = sendRes == SEND_UNAUTHORIZED && response.mErrorMsg != kAKErrorMsg;
EnterpriseSLSClientManager::GetInstance()->UpdateAccessKeyStatus(bufferMeta.aliuid(),
!hasAuthError);
EnterpriseSLSClientManager::GetInstance()->UpdateProjectAnonymousWriteStatus(
bufferMeta.project(), !hasAuthError);
}
#endif
if (time(nullptr) - beginTime >= INT32_FLAG(discard_send_fail_interval)) {
sendResult = true;
discardCount++;
}
if (sendResult) {
break;
}
{
lock_guard<mutex> lock(mBufferSenderThreadRunningMux);
if (!mIsSendBufferThreadRunning) {
break;
}
}
}
}
}
delete[] des;
}
if (sendResult)
meta.mHandled = 1;
LOG_DEBUG(sLogger,
("send LogGroup from local buffer file", filename)("rawsize", bufferMeta.rawsize())("sendResult",
sendResult));
WriteBackMeta(pos - meta.mEncryptionSize - sizeof(meta)
- (meta.mEncodedInfoSize > BUFFER_META_BASE_SIZE
? (meta.mEncodedInfoSize - BUFFER_META_BASE_SIZE)
: meta.mEncodedInfoSize),
(char*)&meta,
sizeof(meta),
filename);
if (!sendResult)
writeBack = true;
{
lock_guard<mutex> lock(mBufferSenderThreadRunningMux);
if (!mIsSendBufferThreadRunning) {
return;
}
}
}
if (!writeBack) {
remove(filename.c_str());
if (discardCount > 0) {
LOG_ERROR(sLogger, ("send buffer file, discard LogGroup count", discardCount)("delete file", filename));
AlarmManager::GetInstance()->SendAlarm(DISCARD_SECONDARY_ALARM,
"delete buffer file: " + filename + ", discard "
+ ToString(discardCount) + " logGroups");
} else
LOG_INFO(sLogger, ("send buffer file success, delete buffer file", filename));
}
}