core/plugin/flusher/sls/DiskBufferWriter.cpp (904 lines of code) (raw):

// Copyright 2024 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "plugin/flusher/sls/DiskBufferWriter.h" #include <cstddef> #include "Flags.h" #include "app_config/AppConfig.h" #include "application/Application.h" #include "collection_pipeline/limiter/RateLimiter.h" #include "collection_pipeline/queue/QueueKeyManager.h" #include "collection_pipeline/queue/SLSSenderQueueItem.h" #include "common/CompressTools.h" #include "common/ErrorUtil.h" #include "common/FileEncryption.h" #include "common/FileSystemUtil.h" #include "common/RuntimeUtil.h" #include "common/StringTools.h" #include "logger/Logger.h" #include "monitor/AlarmManager.h" #include "plugin/flusher/sls/FlusherSLS.h" #include "plugin/flusher/sls/SLSClientManager.h" #include "plugin/flusher/sls/SLSConstant.h" #include "plugin/flusher/sls/SendResult.h" #include "protobuf/sls/sls_logs.pb.h" #include "provider/Provider.h" #ifdef __ENTERPRISE__ #include "plugin/flusher/sls/EnterpriseSLSClientManager.h" #endif DEFINE_FLAG_INT32(write_secondary_wait_timeout, "interval of dump seconary buffer from memory to file, seconds", 2); DEFINE_FLAG_INT32(buffer_file_alive_interval, "the max alive time of a bufferfile, 5 minutes", 300); DEFINE_FLAG_INT32(log_expire_time, "log expire time", 24 * 3600); DEFINE_FLAG_INT32(quota_exceed_wait_interval, "when daemon buffer thread get quotaExceed error, sleep 5 seconds", 5); DEFINE_FLAG_INT32(secondary_buffer_count_limit, "data ready for write buffer file", 20); DEFINE_FLAG_INT32(send_retry_sleep_interval, "sleep microseconds when sync send fail, 50ms", 50000); DEFINE_FLAG_INT32(buffer_check_period, "check logtail local storage buffer period", 60); DEFINE_FLAG_INT32(unauthorized_wait_interval, "", 1); DEFINE_FLAG_INT32(send_retrytimes, "how many times should retry if PostLogStoreLogs operation fail", 3); DECLARE_FLAG_INT32(discard_send_fail_interval); using namespace std; namespace logtail { #ifdef __ENTERPRISE__ static EndpointMode GetEndpointMode(sls_logs::EndpointMode mode) { switch (mode) { case sls_logs::EndpointMode::DEFAULT: return EndpointMode::DEFAULT; case sls_logs::EndpointMode::ACCELERATE: return EndpointMode::ACCELERATE; case sls_logs::EndpointMode::CUSTOM: return EndpointMode::CUSTOM; } return EndpointMode::DEFAULT; } static sls_logs::EndpointMode GetEndpointMode(EndpointMode mode) { switch (mode) { case EndpointMode::DEFAULT: return sls_logs::EndpointMode::DEFAULT; case EndpointMode::ACCELERATE: return sls_logs::EndpointMode::ACCELERATE; case EndpointMode::CUSTOM: return sls_logs::EndpointMode::CUSTOM; } return sls_logs::EndpointMode::DEFAULT; } static const string kAKErrorMsg = "can not get valid access key"; #endif static const string kNoHostErrorMsg = "can not get available host"; static const string& GetSLSCompressTypeString(sls_logs::SlsCompressType compressType) { switch (compressType) { case sls_logs::SLS_CMP_NONE: { static string none = ""; return none; } case sls_logs::SLS_CMP_ZSTD: { static string zstd = "zstd"; return zstd; } default: { static string lz4 = "lz4"; return lz4; } } } const int32_t DiskBufferWriter::BUFFER_META_BASE_SIZE = 65536; const size_t DiskBufferWriter::BUFFER_META_MAX_SIZE = 1 * 1024 * 1024; void DiskBufferWriter::Init() { mBufferDivideTime = time(NULL); mCheckPeriod = INT32_FLAG(buffer_check_period); SetBufferFilePath(AppConfig::GetInstance()->GetBufferFilePath()); mBufferSenderThreadRes = async(launch::async, &DiskBufferWriter::BufferSenderThread, this); mBufferWriterThreadRes = async(launch::async, &DiskBufferWriter::BufferWriterThread, this); } void DiskBufferWriter::Stop() { // stop buffer writer mIsFlush = true; // stop buffer sender { lock_guard<mutex> lock(mBufferSenderThreadRunningMux); mIsSendBufferThreadRunning = false; } mStopCV.notify_one(); if (mBufferWriterThreadRes.valid()) { future_status s = mBufferWriterThreadRes.wait_for(chrono::seconds(5)); if (s == future_status::ready) { LOG_INFO(sLogger, ("disk buffer writer", "stopped successfully")); } else { LOG_WARNING(sLogger, ("disk buffer writer", "forced to stopped")); } } if (mBufferSenderThreadRes.valid()) { // timeout should be larger than network timeout, which is 15 for now future_status s = mBufferSenderThreadRes.wait_for(chrono::seconds(20)); if (s == future_status::ready) { LOG_INFO(sLogger, ("disk buffer sender", "stopped successfully")); } else { LOG_WARNING(sLogger, ("disk buffer sender", "forced to stopped")); } } } bool DiskBufferWriter::PushToDiskBuffer(SenderQueueItem* item, uint32_t retryTimes) { auto slsItem = static_cast<SLSSenderQueueItem*>(item); uint32_t retry = 0; while (++retry < retryTimes) { if (Application::GetInstance()->IsExiting() || mQueue.Size() < static_cast<size_t>(INT32_FLAG(secondary_buffer_count_limit))) { if (slsItem->mExactlyOnceCheckpoint == nullptr) { // explicitly clone the data to avoid dataPtr be destructed by queue mQueue.Push(item->Clone()); } return true; } this_thread::sleep_for(chrono::milliseconds(50)); } auto flusher = static_cast<const FlusherSLS*>(slsItem->mFlusher); LOG_WARNING(sLogger, ("failed to add sender queue item to disk buffer writer", "queue is full")("action", "discard data")( "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(item->mFlusher->GetQueueKey()))); AlarmManager::GetInstance()->SendAlarm( DISCARD_DATA_ALARM, "failed to add sender queue item to disk buffer writer: queue is full\taction: discard data", flusher->mRegion, flusher->mProject, "", slsItem->mLogstore); return false; } void DiskBufferWriter::BufferWriterThread() { LOG_INFO(sLogger, ("disk buffer writer", "started")); vector<SenderQueueItem*> res; while (true) { if (!mQueue.WaitAndPopAll(res, INT32_FLAG(write_secondary_wait_timeout) * 1000)) { if (mIsFlush && mQueue.Empty()) { break; } } // update bufferDiveideTime to flush data; buffer file before bufferDiveideTime will be ready for read if (time(NULL) - mBufferDivideTime > INT32_FLAG(buffer_file_alive_interval)) { CreateNewFile(); } if (!res.empty()) { for (auto itr = res.begin(); itr != res.end(); ++itr) { SendToBufferFile(*itr); delete *itr; } res.clear(); } } } void DiskBufferWriter::BufferSenderThread() { LOG_INFO(sLogger, ("disk buffer sender", "started")); unique_lock<mutex> lock(mBufferSenderThreadRunningMux); while (mIsSendBufferThreadRunning) { vector<string> filesToSend; if (!LoadFileToSend(mBufferDivideTime, filesToSend)) { if (mStopCV.wait_for( lock, chrono::seconds(mCheckPeriod), [this]() { return !mIsSendBufferThreadRunning; })) { break; } continue; } lock.unlock(); // mIsSendingBuffer = true; int32_t fileToSendCount = int32_t(filesToSend.size()); int32_t bufferFileNumValue = AppConfig::GetInstance()->GetNumOfBufferFile(); for (int32_t i = (fileToSendCount > bufferFileNumValue ? fileToSendCount - bufferFileNumValue : 0); i < fileToSendCount && mIsSendBufferThreadRunning; ++i) { string fileName = GetBufferFilePath() + filesToSend[i]; unordered_map<string, string> kvMap; if (FileEncryption::CheckHeader(fileName, kvMap)) { int32_t keyVersion = -1; if (kvMap.find(STRING_FLAG(file_encryption_field_key_version)) != kvMap.end()) { if (!StringTo(kvMap[STRING_FLAG(file_encryption_field_key_version)], keyVersion)) { LOG_ERROR(sLogger, ("convert key_version to int32_t fail", kvMap[STRING_FLAG(file_encryption_field_key_version)])); } } if (keyVersion >= 1 && keyVersion <= FileEncryption::GetInstance()->GetDefaultKeyVersion()) { LOG_INFO(sLogger, ("check local encryption file", fileName)("key_version", keyVersion)); SendEncryptionBuffer(fileName, keyVersion); } else { remove(fileName.c_str()); LOG_ERROR(sLogger, ("invalid key_version in header", kvMap[STRING_FLAG(file_encryption_field_key_version)])("delete bufffer file", fileName)); AlarmManager::GetInstance()->SendAlarm( DISCARD_SECONDARY_ALARM, "key version in buffer file invalid, delete file: " + fileName); } } else { remove(fileName.c_str()); LOG_WARNING(sLogger, ("check header of buffer file failed, delete file", fileName)); AlarmManager::GetInstance()->SendAlarm(DISCARD_SECONDARY_ALARM, "check header of buffer file failed, delete file: " + fileName); } } #ifdef __ENTERPRISE__ mCandidateHostsInfos.clear(); #endif // mIsSendingBuffer = false; lock.lock(); if (mStopCV.wait_for(lock, chrono::seconds(mCheckPeriod), [this]() { return !mIsSendBufferThreadRunning; })) { break; } } } void DiskBufferWriter::SetBufferFilePath(const std::string& bufferfilepath) { lock_guard<mutex> lock(mBufferFileLock); if (bufferfilepath == "") { mBufferFilePath = GetBufferFileDir(); } else mBufferFilePath = bufferfilepath; if (mBufferFilePath[mBufferFilePath.size() - 1] != PATH_SEPARATOR[0]) mBufferFilePath += PATH_SEPARATOR; mBufferFileName = ""; } std::string DiskBufferWriter::GetBufferFilePath() { lock_guard<mutex> lock(mBufferFileLock); return mBufferFilePath; } void DiskBufferWriter::SetBufferFileName(const std::string& filename) { lock_guard<mutex> lock(mBufferFileLock); mBufferFileName = filename; } std::string DiskBufferWriter::GetBufferFileName() { lock_guard<mutex> lock(mBufferFileLock); return mBufferFileName; } bool DiskBufferWriter::LoadFileToSend(time_t timeLine, std::vector<std::string>& filesToSend) { string bufferFilePath = GetBufferFilePath(); if (!CheckExistance(bufferFilePath)) { if (GetBufferFileDir().find(bufferFilePath) != 0) { LOG_WARNING(sLogger, ("buffer file path not exist", bufferFilePath)("logtail will not recreate external path", "local secondary does not work")); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("buffer file directory:") + bufferFilePath + " not exist"); return false; } string errorMessage; if (!RebuildExecutionDir(AppConfig::GetInstance()->GetIlogtailConfigJson(), errorMessage)) { LOG_ERROR(sLogger, ("failed to rebuild buffer file path", bufferFilePath)("errorMessage", errorMessage)); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, errorMessage); return false; } else LOG_INFO(sLogger, ("rebuild buffer file path success", bufferFilePath)); } fsutil::Dir dir(bufferFilePath); if (!dir.Open()) { string errorStr = ErrnoToString(GetErrno()); LOG_ERROR(sLogger, ("open dir error", bufferFilePath)("reason", errorStr)); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("open dir error,dir:") + bufferFilePath + ",error:" + errorStr); return false; } fsutil::Entry ent; while ((ent = dir.ReadNext())) { string filename = ent.Name(); if (filename.find(GetSendBufferFileNamePrefix()) == 0) { int32_t filetime{}; if (!StringTo(filename.substr(GetSendBufferFileNamePrefix().size()), filetime)) { LOG_INFO(sLogger, ("can not get file time from file name", filename)); continue; } if (filetime < timeLine) { filesToSend.push_back(filename); } } } sort(filesToSend.begin(), filesToSend.end()); return true; } bool DiskBufferWriter::ReadNextEncryption(int32_t& pos, const std::string& filename, std::string& encryption, EncryptionStateMeta& meta, bool& readResult, sls_logs::LogtailBufferMeta& bufferMeta) { bufferMeta.Clear(); readResult = false; encryption.clear(); int retryTimes = 0; FILE* fin = NULL; while (true) { retryTimes++; fin = FileReadOnlyOpen(filename.c_str(), "rb"); if (fin) break; if (retryTimes >= 3) { string errorStr = ErrnoToString(GetErrno()); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("open file error:") + filename + ",error:" + errorStr); LOG_ERROR(sLogger, ("open file error", filename)("error", errorStr)); return false; } usleep(5000); } fseek(fin, 0, SEEK_END); auto const currentSize = ftell(fin); if (currentSize == pos) { fclose(fin); return false; } fseek(fin, pos, SEEK_SET); auto nbytes = fread(static_cast<void*>(&meta), sizeof(char), sizeof(meta), fin); if (nbytes != sizeof(meta)) { string errorStr = ErrnoToString(GetErrno()); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("read encryption file meta error:") + filename + ", error:" + errorStr + ", meta.mEncryptionSize:" + ToString(meta.mEncryptionSize) + ", nbytes: " + ToString(nbytes) + ", pos: " + ToString(pos) + ", ftell: " + ToString(currentSize)); LOG_ERROR(sLogger, ("read encryption file meta error", filename)("error", errorStr)("nbytes", nbytes)("pos", pos)("ftell", currentSize)); fclose(fin); return false; } bool pbMeta = false; int32_t encodedInfoSize = meta.mEncodedInfoSize; if (encodedInfoSize > BUFFER_META_BASE_SIZE) { encodedInfoSize -= BUFFER_META_BASE_SIZE; pbMeta = true; } if (meta.mEncryptionSize < 0 || encodedInfoSize < 0) { AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("meta of encryption file invalid:" + filename + ", meta.mEncryptionSize:" + ToString(meta.mEncryptionSize) + ", meta.mEncodedInfoSize:" + ToString(meta.mEncodedInfoSize))); LOG_ERROR(sLogger, ("meta of encryption file invalid", filename)("meta.mEncryptionSize", meta.mEncryptionSize)( "meta.mEncodedInfoSize", meta.mEncodedInfoSize)); fclose(fin); return false; } pos += sizeof(meta) + encodedInfoSize + meta.mEncryptionSize; if ((time(NULL) - meta.mTimeStamp) > INT32_FLAG(log_expire_time) || meta.mHandled == 1) { fclose(fin); if (meta.mHandled != 1) { LOG_WARNING(sLogger, ("timeout buffer file, meta.mTimeStamp", meta.mTimeStamp)); AlarmManager::GetInstance()->SendAlarm(DISCARD_SECONDARY_ALARM, "buffer file timeout (1day), delete file: " + filename); } return true; } char* buffer = new char[encodedInfoSize + 1]; nbytes = fread(buffer, sizeof(char), encodedInfoSize, fin); if (nbytes != static_cast<size_t>(encodedInfoSize)) { fclose(fin); string errorStr = ErrnoToString(GetErrno()); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("read projectname from file error:") + filename + ", error:" + errorStr + ", meta.mEncodedInfoSize:" + ToString(meta.mEncodedInfoSize) + ", nbytes:" + ToString(nbytes)); LOG_ERROR(sLogger, ("read encodedInfo from file error", filename)("error", errorStr)("meta.mEncodedInfoSize", meta.mEncodedInfoSize)("nbytes", nbytes)); delete[] buffer; return true; } string encodedInfo = string(buffer, encodedInfoSize); delete[] buffer; if (pbMeta) { if (!bufferMeta.ParseFromString(encodedInfo)) { fclose(fin); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("parse buffer meta from file error:") + filename); LOG_ERROR(sLogger, ("parse buffer meta from file error", filename)("buffer meta", encodedInfo)); bufferMeta.Clear(); return true; } } else { bufferMeta.set_project(encodedInfo); bufferMeta.set_region(FlusherSLS::GetDefaultRegion()); // new mode bufferMeta.set_aliuid(""); } if (!bufferMeta.has_compresstype()) { bufferMeta.set_compresstype(sls_logs::SlsCompressType::SLS_CMP_LZ4); } if (!bufferMeta.has_telemetrytype()) { bufferMeta.set_telemetrytype(sls_logs::SLS_TELEMETRY_TYPE_LOGS); } #ifdef __ENTERPRISE__ if (!bufferMeta.has_endpointmode()) { bufferMeta.set_endpointmode(sls_logs::EndpointMode::DEFAULT); } #endif if (!bufferMeta.has_endpoint()) { bufferMeta.set_endpoint(""); } buffer = new char[meta.mEncryptionSize + 1]; nbytes = fread(buffer, sizeof(char), meta.mEncryptionSize, fin); if (nbytes != static_cast<size_t>(meta.mEncryptionSize)) { fclose(fin); string errorStr = ErrnoToString(GetErrno()); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("read encryption from file error:") + filename + ",error:" + errorStr + ",meta.mEncryptionSize:" + ToString(meta.mEncryptionSize) + ", nbytes:" + ToString(nbytes), bufferMeta.region(), bufferMeta.project(), "", bufferMeta.logstore()); LOG_ERROR(sLogger, ("read encryption from file error", filename)("error", errorStr)("meta.mEncryptionSize", meta.mEncryptionSize)("nbytes", nbytes)); delete[] buffer; return true; } encryption = string(buffer, meta.mEncryptionSize); readResult = true; delete[] buffer; fclose(fin); return true; } void DiskBufferWriter::SendEncryptionBuffer(const std::string& filename, int32_t keyVersion) { string encryption; string logData; EncryptionStateMeta meta; bool readResult; bool writeBack = false; int32_t pos = INT32_FLAG(file_encryption_header_length); sls_logs::LogtailBufferMeta bufferMeta; int32_t discardCount = 0; while (ReadNextEncryption(pos, filename, encryption, meta, readResult, bufferMeta)) { logData.clear(); bool sendResult = false; if (!readResult || !CheckBufferMetaValidation(filename, bufferMeta)) { if (meta.mHandled == 1) continue; sendResult = true; discardCount++; } if (!sendResult) { char* des = new char[meta.mLogDataSize]; if (!FileEncryption::GetInstance()->Decrypt( encryption.c_str(), meta.mEncryptionSize, des, meta.mLogDataSize, keyVersion)) { sendResult = true; discardCount++; LOG_ERROR(sLogger, ("decrypt error, project_name", bufferMeta.project())("key_version", keyVersion)("meta.mLogDataSize", meta.mLogDataSize)); AlarmManager::GetInstance()->SendAlarm(ENCRYPT_DECRYPT_FAIL_ALARM, string("decrypt error, project_name:" + bufferMeta.project() + ", key_version:" + ToString(keyVersion) + ", meta.mLogDataSize:" + ToString(meta.mLogDataSize)), bufferMeta.region(), bufferMeta.project(), "", bufferMeta.logstore()); } else { if (bufferMeta.has_logstore()) logData = string(des, meta.mLogDataSize); else { // compatible to old buffer file (logGroup string), convert to LZ4 compressed string logGroupStr = string(des, meta.mLogDataSize); sls_logs::LogGroup logGroup; if (!logGroup.ParseFromString(logGroupStr)) { sendResult = true; LOG_ERROR(sLogger, ("parse error from string to loggroup, projectName is", bufferMeta.project())); discardCount++; AlarmManager::GetInstance()->SendAlarm( LOG_GROUP_PARSE_FAIL_ALARM, string("projectName is:" + bufferMeta.project() + ", fileName is:" + filename), bufferMeta.region(), bufferMeta.project(), "", bufferMeta.logstore()); } else if (!CompressLz4(logGroupStr, logData)) { sendResult = true; LOG_ERROR(sLogger, ("LZ4 compress loggroup fail, projectName is", bufferMeta.project())); discardCount++; AlarmManager::GetInstance()->SendAlarm( SEND_COMPRESS_FAIL_ALARM, string("projectName is:" + bufferMeta.project() + ", fileName is:" + filename), bufferMeta.region(), bufferMeta.project(), "", bufferMeta.logstore()); } else { bufferMeta.set_logstore(logGroup.category()); bufferMeta.set_datatype(int(RawDataType::EVENT_GROUP)); bufferMeta.set_rawsize(meta.mLogDataSize); bufferMeta.set_compresstype(sls_logs::SLS_CMP_LZ4); bufferMeta.set_telemetrytype(sls_logs::SLS_TELEMETRY_TYPE_LOGS); } } if (!sendResult) { time_t beginTime = time(nullptr); while (true) { string host; auto response = SendBufferFileData(bufferMeta, logData, host); SendResult sendRes = SEND_OK; if (response.mStatusCode != 200) { sendRes = ConvertErrorCode(response.mErrorCode); } switch (sendRes) { case SEND_OK: sendResult = true; break; case SEND_NETWORK_ERROR: case SEND_SERVER_ERROR: if (response.mErrorMsg != kNoHostErrorMsg) { LOG_WARNING( sLogger, ("send data to SLS fail", "retry later")("request id", response.mRequestId)( "error_code", response.mErrorCode)("error_message", response.mErrorMsg)( "endpoint", host)("projectName", bufferMeta.project())( "logstore", bufferMeta.logstore())("rawsize", bufferMeta.rawsize())); } usleep(INT32_FLAG(send_retry_sleep_interval)); break; case SEND_QUOTA_EXCEED: AlarmManager::GetInstance()->SendAlarm(SEND_QUOTA_EXCEED_ALARM, "error_code: " + response.mErrorCode + ", error_message: " + response.mErrorMsg, bufferMeta.region(), bufferMeta.project(), "", bufferMeta.logstore()); // no region if (!GetProfileSender()->IsProfileData("", bufferMeta.project(), bufferMeta.logstore())) LOG_WARNING( sLogger, ("send data to SLS fail", "retry later")("request id", response.mRequestId)( "error_code", response.mErrorCode)("error_message", response.mErrorMsg)( "endpoint", host)("projectName", bufferMeta.project())( "logstore", bufferMeta.logstore())("rawsize", bufferMeta.rawsize())); usleep(INT32_FLAG(quota_exceed_wait_interval)); break; case SEND_UNAUTHORIZED: usleep(INT32_FLAG(unauthorized_wait_interval)); break; default: sendResult = true; discardCount++; break; } #ifdef __ENTERPRISE__ if (sendRes != SEND_NETWORK_ERROR && sendRes != SEND_SERVER_ERROR) { bool hasAuthError = sendRes == SEND_UNAUTHORIZED && response.mErrorMsg != kAKErrorMsg; EnterpriseSLSClientManager::GetInstance()->UpdateAccessKeyStatus(bufferMeta.aliuid(), !hasAuthError); EnterpriseSLSClientManager::GetInstance()->UpdateProjectAnonymousWriteStatus( bufferMeta.project(), !hasAuthError); } #endif if (time(nullptr) - beginTime >= INT32_FLAG(discard_send_fail_interval)) { sendResult = true; discardCount++; } if (sendResult) { break; } { lock_guard<mutex> lock(mBufferSenderThreadRunningMux); if (!mIsSendBufferThreadRunning) { break; } } } } } delete[] des; } if (sendResult) meta.mHandled = 1; LOG_DEBUG(sLogger, ("send LogGroup from local buffer file", filename)("rawsize", bufferMeta.rawsize())("sendResult", sendResult)); WriteBackMeta(pos - meta.mEncryptionSize - sizeof(meta) - (meta.mEncodedInfoSize > BUFFER_META_BASE_SIZE ? (meta.mEncodedInfoSize - BUFFER_META_BASE_SIZE) : meta.mEncodedInfoSize), (char*)&meta, sizeof(meta), filename); if (!sendResult) writeBack = true; { lock_guard<mutex> lock(mBufferSenderThreadRunningMux); if (!mIsSendBufferThreadRunning) { return; } } } if (!writeBack) { remove(filename.c_str()); if (discardCount > 0) { LOG_ERROR(sLogger, ("send buffer file, discard LogGroup count", discardCount)("delete file", filename)); AlarmManager::GetInstance()->SendAlarm(DISCARD_SECONDARY_ALARM, "delete buffer file: " + filename + ", discard " + ToString(discardCount) + " logGroups"); } else LOG_INFO(sLogger, ("send buffer file success, delete buffer file", filename)); } } // file is not really created when call CreateNewFile(), file created happened when SendToBufferFile() first called bool DiskBufferWriter::CreateNewFile() { vector<string> filesToSend; int64_t currentTime = time(NULL); if (!LoadFileToSend(currentTime, filesToSend)) return false; int32_t bufferFileNumValue = AppConfig::GetInstance()->GetNumOfBufferFile(); for (int32_t i = 0; i < (int32_t)filesToSend.size() - bufferFileNumValue; ++i) { string fileName = GetBufferFilePath() + filesToSend[i]; if (CheckExistance(fileName)) { remove(fileName.c_str()); LOG_ERROR(sLogger, ("buffer file count exceed limit", "file created earlier will be cleaned, and new file will create for new log data")("delete file", fileName)); AlarmManager::GetInstance()->SendAlarm(DISCARD_SECONDARY_ALARM, "buffer file count exceed, delete file: " + fileName); } } mBufferDivideTime = currentTime; SetBufferFileName(GetBufferFilePath() + GetSendBufferFileNamePrefix() + ToString(currentTime)); return true; } bool DiskBufferWriter::WriteBackMeta(int32_t pos, const void* buf, int32_t length, const string& filename) { // TODO: Why not use fopen or fstream??? // TODO: Make sure and merge them. #if defined(__linux__) int fd = open(filename.c_str(), O_WRONLY); if (fd < 0) { string errorStr = ErrnoToString(GetErrno()); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("open secondary file for write meta fail:") + filename + ",reason:" + errorStr); LOG_ERROR(sLogger, ("open file error", filename)); return false; } lseek(fd, pos, SEEK_SET); if (write(fd, buf, length) < 0) { string errorStr = ErrnoToString(GetErrno()); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("write secondary file for write meta fail:") + filename + ",reason:" + errorStr); LOG_ERROR(sLogger, ("can not write back meta", filename)); } close(fd); return true; #elif defined(_MSC_VER) FILE* f = FileWriteOnlyOpen(filename.c_str(), "wb"); if (!f) { string errorStr = ErrnoToString(GetErrno()); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("open secondary file for write meta fail:") + filename + ",reason:" + errorStr); LOG_ERROR(sLogger, ("open file error", filename)); return false; } fseek(f, pos, SEEK_SET); auto nbytes = fwrite(buf, 1, length, f); if (nbytes != length) { string errorStr = ErrnoToString(GetErrno()); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("write secondary file for write meta fail:") + filename + ",reason:" + errorStr); LOG_ERROR(sLogger, ("can not write back meta", filename)); } fclose(f); return true; #endif } string DiskBufferWriter::GetBufferFileHeader() { string reserve = STRING_FLAG(file_encryption_field_key_version) + STRING_FLAG(file_encryption_key_value_splitter) + ToString(FileEncryption::GetInstance()->GetDefaultKeyVersion()); string nullHeader = string( (INT32_FLAG(file_encryption_header_length) - STRING_FLAG(file_encryption_magic_number).size() - reserve.size()), '\0'); return (STRING_FLAG(file_encryption_magic_number) + reserve + nullHeader); } bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) { auto data = static_cast<SLSSenderQueueItem*>(dataPtr); auto flusher = static_cast<const FlusherSLS*>(data->mFlusher); string bufferFileName = GetBufferFileName(); if (bufferFileName.empty()) { CreateNewFile(); bufferFileName = GetBufferFileName(); } // if file not exist, create it new FILE* fout = FileAppendOpen(bufferFileName.c_str(), "ab"); if (!fout) { string errorStr = ErrnoToString(GetErrno()); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("open file error:") + bufferFileName + ",error:" + errorStr, flusher->mRegion, flusher->mProject, "", data->mLogstore); LOG_ERROR(sLogger, ("open buffer file error", bufferFileName)); return false; } if (ftell(fout) == (streampos)0) { string header = GetBufferFileHeader(); auto nbytes = fwrite(header.c_str(), 1, header.size(), fout); if (header.size() != nbytes) { string errorStr = ErrnoToString(GetErrno()); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("write file error:") + bufferFileName + ", error:" + errorStr + ", nbytes:" + ToString(nbytes), flusher->mRegion, flusher->mProject, "", data->mLogstore); LOG_ERROR(sLogger, ("error write encryption header", bufferFileName)("error", errorStr)("nbytes", nbytes)); fclose(fout); return false; } } char* des; int32_t desLength; if (!FileEncryption::GetInstance()->Encrypt(data->mData.c_str(), data->mData.size(), des, desLength)) { fclose(fout); LOG_ERROR(sLogger, ("encrypt error, project_name", flusher->mProject)); AlarmManager::GetInstance()->SendAlarm(ENCRYPT_DECRYPT_FAIL_ALARM, string("encrypt error, project_name:" + flusher->mProject), flusher->mRegion, flusher->mProject, "", data->mLogstore); return false; } sls_logs::LogtailBufferMeta bufferMeta; bufferMeta.set_project(flusher->mProject); bufferMeta.set_region(flusher->mRegion); bufferMeta.set_aliuid(flusher->mAliuid); bufferMeta.set_logstore(data->mLogstore); bufferMeta.set_datatype(int32_t(data->mType)); bufferMeta.set_rawsize(data->mRawSize); bufferMeta.set_shardhashkey(data->mShardHashKey); bufferMeta.set_compresstype(ConvertCompressType(flusher->GetCompressType())); bufferMeta.set_telemetrytype(flusher->mTelemetryType); bufferMeta.set_subpath(flusher->GetSubpath()); #ifdef __ENTERPRISE__ bufferMeta.set_endpointmode(GetEndpointMode(flusher->mEndpointMode)); #endif bufferMeta.set_endpoint(flusher->mEndpoint); string encodedInfo; bufferMeta.SerializeToString(&encodedInfo); EncryptionStateMeta meta; int32_t encodedInfoSize = encodedInfo.size(); meta.mEncodedInfoSize = encodedInfoSize + BUFFER_META_BASE_SIZE; meta.mLogDataSize = data->mData.size(); meta.mTimeStamp = time(NULL); meta.mHandled = 0; meta.mRetryTime = 0; meta.mEncryptionSize = desLength; char* buffer = new char[sizeof(meta) + encodedInfoSize + meta.mEncryptionSize]; memcpy(buffer, (char*)&meta, sizeof(meta)); memcpy(buffer + sizeof(meta), encodedInfo.c_str(), encodedInfoSize); memcpy(buffer + sizeof(meta) + encodedInfoSize, des, desLength); delete[] des; const auto bytesToWrite = sizeof(meta) + encodedInfoSize + meta.mEncryptionSize; auto nbytes = fwrite(buffer, 1, bytesToWrite, fout); if (nbytes != bytesToWrite) { string errorStr = ErrnoToString(GetErrno()); AlarmManager::GetInstance()->SendAlarm(SECONDARY_READ_WRITE_ALARM, string("write file error:") + bufferFileName + ", error:" + errorStr + ", nbytes:" + ToString(nbytes), flusher->mRegion, flusher->mProject, "", data->mLogstore); LOG_ERROR( sLogger, ("write meta of buffer file", "fail")("filename", bufferFileName)("errorStr", errorStr)("nbytes", nbytes)); delete[] buffer; fclose(fout); return false; } delete[] buffer; if (ftell(fout) > AppConfig::GetInstance()->GetLocalFileSize()) CreateNewFile(); fclose(fout); LOG_DEBUG(sLogger, ("write buffer file", bufferFileName)); return true; } SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMeta& bufferMeta, const std::string& logData, std::string& host) { RateLimiter::FlowControl(bufferMeta.rawsize(), mSendLastTime, mSendLastByte, false); string region = bufferMeta.region(); #ifdef __ENTERPRISE__ // old buffer file which record the endpoint if (region.find("http://") == 0) { region = EnterpriseSLSClientManager::GetInstance()->GetRegionFromEndpoint(region); } #endif SLSClientManager::AuthType type; string accessKeyId, accessKeySecret; if (!SLSClientManager::GetInstance()->GetAccessKey(bufferMeta.aliuid(), type, accessKeyId, accessKeySecret)) { #ifdef __ENTERPRISE__ if (!EnterpriseSLSClientManager::GetInstance()->GetAccessKeyIfProjectSupportsAnonymousWrite( bufferMeta.project(), type, accessKeyId, accessKeySecret)) { SLSResponse response; response.mErrorCode = LOGE_UNAUTHORIZED; response.mErrorMsg = kAKErrorMsg; return response; } #endif } #ifdef __ENTERPRISE__ if (bufferMeta.endpointmode() == sls_logs::EndpointMode::DEFAULT) { EnterpriseSLSClientManager::GetInstance()->UpdateRemoteRegionEndpoints( region, {bufferMeta.endpoint()}, EnterpriseSLSClientManager::RemoteEndpointUpdateAction::APPEND); } auto info = EnterpriseSLSClientManager::GetInstance()->GetCandidateHostsInfo( region, bufferMeta.project(), GetEndpointMode(bufferMeta.endpointmode())); mCandidateHostsInfos.insert(info); host = info->GetCurrentHost(); if (host.empty()) { SLSResponse response; response.mErrorCode = LOGE_REQUEST_ERROR; response.mErrorMsg = kNoHostErrorMsg; return response; } #else host = bufferMeta.project() + "." + bufferMeta.endpoint(); #endif bool httpsFlag = SLSClientManager::GetInstance()->UsingHttps(region); RawDataType dataType; if (bufferMeta.datatype() == 0) { dataType = RawDataType::EVENT_GROUP_LIST; } else { dataType = RawDataType::EVENT_GROUP; } auto telemetryType = bufferMeta.has_telemetrytype() ? bufferMeta.telemetrytype() : sls_logs::SLS_TELEMETRY_TYPE_LOGS; switch (telemetryType) { case sls_logs::SLS_TELEMETRY_TYPE_LOGS: return PostLogStoreLogs(accessKeyId, accessKeySecret, type, host, httpsFlag, bufferMeta.project(), bufferMeta.logstore(), GetSLSCompressTypeString(bufferMeta.compresstype()), dataType, logData, bufferMeta.rawsize(), bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : ""); case sls_logs::SLS_TELEMETRY_TYPE_METRICS: return PostMetricStoreLogs(accessKeyId, accessKeySecret, type, host, httpsFlag, bufferMeta.project(), bufferMeta.logstore(), GetSLSCompressTypeString(bufferMeta.compresstype()), logData, bufferMeta.rawsize()); case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS: case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES: case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS: return PostAPMBackendLogs(accessKeyId, accessKeySecret, type, host, httpsFlag, bufferMeta.project(), bufferMeta.logstore(), GetSLSCompressTypeString(bufferMeta.compresstype()), dataType, logData, bufferMeta.rawsize(), bufferMeta.subpath()); default: { // should not happen LOG_ERROR(sLogger, ("Unhandled telemetry type", " should not happen")); SLSResponse response; response.mErrorCode = LOGE_REQUEST_ERROR; response.mErrorMsg = "Unhandled telemetry type"; return response; } } } bool DiskBufferWriter::CheckBufferMetaValidation(const std::string& filename, const sls_logs::LogtailBufferMeta& bufferMeta) { if (bufferMeta.project().empty()) { LOG_ERROR(sLogger, ("send disk buffer fail", "project is empty")("filename", filename)); return false; } if (bufferMeta.aliuid().size() > 16) { LOG_ERROR(sLogger, ("send disk buffer fail", "aliuid size is too large")("filename", filename)("size", bufferMeta.aliuid().size())); return false; } if (sizeof(bufferMeta) > BUFFER_META_MAX_SIZE) { LOG_ERROR( sLogger, ("send disk buffer fail", "buffer meta is too large")("filename", filename)("size", sizeof(bufferMeta))); return false; } return true; } } // namespace logtail