core/plugin/flusher/sls/DiskBufferWriter.h (95 lines of code) (raw):

/* * Copyright 2024 iLogtail Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <cstdint> #include <ctime> #include <atomic> #include <condition_variable> #include <future> #include <memory> #include <mutex> #include <string> #include <vector> #include "collection_pipeline/queue/SenderQueueItem.h" #include "common/SafeQueue.h" #include "plugin/flusher/sls/SLSClientManager.h" #include "plugin/flusher/sls/SLSResponse.h" #include "protobuf/sls/logtail_buffer_meta.pb.h" #ifdef __ENTERPRISE__ #include <unordered_set> #include "plugin/flusher/sls/EnterpriseSLSClientManager.h" #endif namespace logtail { class DiskBufferWriter { public: DiskBufferWriter(const DiskBufferWriter&) = delete; DiskBufferWriter& operator=(const DiskBufferWriter&) = delete; static DiskBufferWriter* GetInstance() { static DiskBufferWriter instance; return &instance; } void Init(); void Stop(); bool PushToDiskBuffer(SenderQueueItem* item, uint32_t retryTimes); private: static const int32_t BUFFER_META_BASE_SIZE; static const size_t BUFFER_META_MAX_SIZE; struct EncryptionStateMeta { int32_t mLogDataSize; int32_t mEncryptionSize; int32_t mEncodedInfoSize; int32_t mTimeStamp; int32_t mHandled; int32_t mRetryTime; }; DiskBufferWriter() = default; ~DiskBufferWriter() = default; void BufferWriterThread(); void BufferSenderThread(); SLSResponse SendBufferFileData(const sls_logs::LogtailBufferMeta& bufferMeta, const std::string& logData, std::string& host); bool SendToBufferFile(SenderQueueItem* dataPtr); bool LoadFileToSend(time_t timeLine, std::vector<std::string>& filesToSend); bool CreateNewFile(); bool WriteBackMeta(const int32_t pos, const void* buf, int32_t length, const std::string& filename); bool ReadNextEncryption(int32_t& pos, const std::string& filename, std::string& encryption, EncryptionStateMeta& meta, bool& readResult, sls_logs::LogtailBufferMeta& bufferMeta); void SendEncryptionBuffer(const std::string& filename, int32_t keyVersion); void SetBufferFilePath(const std::string& bufferfilepath); std::string GetBufferFilePath(); std::string GetBufferFileName(); void SetBufferFileName(const std::string& filename); std::string GetBufferFileHeader(); bool CheckBufferMetaValidation(const std::string& filename, const sls_logs::LogtailBufferMeta& bufferMeta); SafeQueue<SenderQueueItem*> mQueue; std::future<void> mBufferWriterThreadRes; std::atomic_bool mIsFlush = false; std::future<void> mBufferSenderThreadRes; mutable std::mutex mBufferSenderThreadRunningMux; bool mIsSendBufferThreadRunning = true; mutable std::condition_variable mStopCV; #ifdef __ENTERPRISE__ struct PointerHash { std::size_t operator()(const std::shared_ptr<CandidateHostsInfo>& ptr) const { return std::hash<CandidateHostsInfo*>()(ptr.get()); } }; struct PointerEqual { bool operator()(const std::shared_ptr<CandidateHostsInfo>& lhs, const std::shared_ptr<CandidateHostsInfo>& rhs) const { return lhs.get() == rhs.get(); } }; std::unordered_set<std::shared_ptr<CandidateHostsInfo>, PointerHash, PointerEqual> mCandidateHostsInfos; #endif mutable std::mutex mBufferFileLock; std::string mBufferFilePath; std::string mBufferFileName; volatile time_t mBufferDivideTime = 0; int64_t mCheckPeriod = 0; int64_t mSendLastTime = 0; int32_t mSendLastByte = 0; }; } // namespace logtail