core/file_server/reader/LogFileReader.h (365 lines of code) (raw):
/*
* Copyright 2022 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <deque>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "checkpoint/RangeCheckpoint.h"
#include "collection_pipeline/queue/QueueKey.h"
#include "common/DevInode.h"
#include "common/EncodingConverter.h"
#include "common/FileInfo.h"
#include "common/LogFileOperator.h"
#include "common/StringTools.h"
#include "common/StringView.h"
#include "common/TimeUtil.h"
#include "common/memory/SourceBuffer.h"
#include "constants/TagConstants.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/FileServer.h"
#include "file_server/MultilineOptions.h"
#include "file_server/event/Event.h"
#include "file_server/reader/FileReaderOptions.h"
#include "logger/Logger.h"
#include "protobuf/sls/sls_logs.pb.h"
namespace logtail {
struct LogBuffer;
class LogFileReader;
class DevInode;
typedef std::shared_ptr<LogFileReader> LogFileReaderPtr;
typedef std::deque<LogFileReaderPtr> LogFileReaderPtrArray;
struct LineInfo {
StringView data;
std::string dataRaw;
int32_t lineBegin;
int32_t lineEnd;
int32_t rollbackLineFeedCount;
bool fullLine;
int32_t forceRollbackLineFeedCount;
LineInfo(StringView data = StringView(),
int32_t lineBegin = 0,
int32_t lineEnd = 0,
int32_t rollbackLineFeedCount = 0,
bool fullLine = false,
int32_t forceRollbackLineFeedCount = 0)
: data(data),
lineBegin(lineBegin),
lineEnd(lineEnd),
rollbackLineFeedCount(rollbackLineFeedCount),
fullLine(fullLine),
forceRollbackLineFeedCount(forceRollbackLineFeedCount) {}
};
class BaseLineParse {
public:
BaseLineParse(size_t size)
: mSourceBuffer(std::make_unique<SourceBuffer>()),
mStringBuffer(mSourceBuffer->AllocateStringBuffer(size + 1)) {}
virtual LineInfo GetLastLine(StringView buffer,
int32_t end,
size_t protocolFunctionIndex,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers)
= 0;
StringBuffer* GetStringBuffer();
private:
std::unique_ptr<SourceBuffer> mSourceBuffer;
StringBuffer mStringBuffer;
};
class ContainerdTextParser : public BaseLineParse {
public:
LineInfo GetLastLine(StringView buffer,
int32_t end,
size_t protocolFunctionIndex,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) override;
void parseLine(LineInfo rawLine, LineInfo& paseLine);
void mergeLines(LineInfo& resultLine, const LineInfo& additionalLine, bool shouldResetBuffer);
ContainerdTextParser(size_t size) : BaseLineParse(size) {}
};
class DockerJsonFileParser : public BaseLineParse {
public:
LineInfo GetLastLine(StringView buffer,
int32_t end,
size_t protocolFunctionIndex,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) override;
bool parseLine(LineInfo rawLine, LineInfo& paseLine);
DockerJsonFileParser(size_t size) : BaseLineParse(size) {}
};
class RawTextParser : public BaseLineParse {
public:
LineInfo GetLastLine(StringView buffer,
int32_t end,
size_t protocolFunctionIndex,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) override;
LineInfo parse(StringView buffer, int32_t end, size_t protocolFunctionIndex);
RawTextParser(size_t size) : BaseLineParse(size) {}
};
// Only get the currently written log file, it will choose the last modified file to read. There are several condition
// to choose the lastmodify file:
// 1. if the last read file don't exist
// 2. if the file's first 100 bytes(file signature) is not same with the last read file's signature, which meaning the
// log file has be rolled
//
// when a new file is choosen, it will set the read position
// 1. if the time in the file's first line >= the last read log time , then set the file read position to 0 (which mean
// the file is new created)
// 2. other wise , set the position to the end of the file
// *bufferptr is null terminated.
/*
* 1. for multiline log, "xxx" mean a string without '\n'
* 1-1. bufferSize = 512KB:
* "MultiLineLog_1\nMultiLineLog_2\nMultiLineLog_3\n" -> "MultiLineLog_1\nMultiLineLog_2\0"
* "MultiLineLog_1\nMultiLineLog_2\nMultiLineLog_3_Line_1\n" -> "MultiLineLog_1\nMultiLineLog_2\0"
* "MultiLineLog_1\nMultiLineLog_2\nMultiLineLog_3_Line_1\nxxx" -> "MultiLineLog_1\nMultiLineLog_2\0"
* "MultiLineLog_1\nMultiLineLog_2\nMultiLineLog_3\nxxx" -> "MultiLineLog_1\nMultiLineLog_2\0"
*
* 1-2. bufferSize < 512KB:
* "MultiLineLog_1\nMultiLineLog_2\nMultiLineLog_3\n" -> "MultiLineLog_1\nMultiLineLog_2\nMultiLineLog_3\0"
* "MultiLineLog_1\nMultiLineLog_2\nMultiLineLog_3_Line_1\n" -> "MultiLineLog_1\nMultiLineLog_2\MultiLineLog_3_Line_1\0"
* **this is not expected !** "MultiLineLog_1\nMultiLineLog_2\nMultiLineLog_3_Line_1\nxxx" ->
* "MultiLineLog_1\nMultiLineLog_2\0" "MultiLineLog_1\nMultiLineLog_2\nMultiLineLog_3\nxxx" ->
* "MultiLineLog_1\nMultiLineLog_2\0"
*
* 2. for singleline log, "xxx" mean a string without '\n'
* "SingleLineLog_1\nSingleLineLog_2\nSingleLineLog_3\n" -> "SingleLineLog_1\nSingleLineLog_2\nSingleLineLog_3\0"
* "SingleLineLog_1\nSingleLineLog_2\nxxx" -> "SingleLineLog_1\nSingleLineLog_2\0"
*/
class LogFileReader {
public:
enum class LogFormat { TEXT, CONTAINERD_TEXT, DOCKER_JSON_FILE };
LogFormat mFileLogFormat = LogFormat::TEXT;
static size_t BUFFER_SIZE;
static const int32_t CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY = -1;
static const int32_t CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY = -2;
std::vector<BaseLineParse*> mLineParsers = {};
template <typename T>
T* GetParser(size_t size) {
thread_local static T sParse = T(size);
return &sParse;
};
enum FileCompareResult {
FileCompareResult_DevInodeChange,
FileCompareResult_SigChange,
FileCompareResult_SigSameSizeChange,
FileCompareResult_SigSameSizeSame,
FileCompareResult_Error
};
enum FileReadPolicy {
BACKWARD_TO_BEGINNING,
BACKWARD_TO_BOOT_TIME,
BACKWARD_TO_FIXED_POS,
};
static LogFileReader* CreateLogFileReader(const std::string& hostLogPathDir,
const std::string& hostLogPathFile,
const DevInode& devInode,
const FileReaderConfig& readerConfig,
const MultilineConfig& multilineConfig,
const FileDiscoveryConfig& discoveryConfig,
const FileTagConfig& tagConfig,
uint32_t exactlyonceConcurrency,
bool forceFromBeginning);
static PipelineEventGroup GenerateEventGroup(LogFileReaderPtr reader, LogBuffer* logBuffer);
LogFileReader(const std::string& hostLogPathDir,
const std::string& hostLogPathFile,
const DevInode& devInode,
const FileReaderConfig& readerConfig,
const MultilineConfig& multilineConfig,
const FileTagConfig& tagConfig);
bool ReadLog(LogBuffer& logBuffer, const Event* event);
time_t GetLastUpdateTime() const // actually it's the time whenever ReadLogs is called
{
return mLastUpdateTime;
}
// 转移至multilineoptions
// // this function should only be called once
// void SetLogMultilinePolicy(const std::string& begReg, const std::string& conReg, const std::string& endReg);
// bool IsMultiLine() { return mLogBeginRegPtr != NULL || mLogContinueRegPtr != NULL || mLogEndRegPtr != NULL; }
// void SetReaderFlushTimeout(int timeout) { mReaderFlushTimeout = timeout; }
std::string GetTopicName(const std::string& topicConfig, const std::string& path);
void SetTopicName(const std::string& topic) { mTopicName = topic; }
FileCompareResult CompareToFile(const std::string& filePath);
virtual int32_t
RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& rollbackLineFeedCount, bool allowRollback = true);
size_t AlignLastCharacter(char* buffer, size_t size);
virtual ~LogFileReader();
// const std::string& GetRegion() const { return mRegion; }
// void SetRegion(const std::string& region) { mRegion = region; }
// const std::string& GetConfigName() const { return mConfigName; }
// void SetConfigName(const std::string& configName) { mConfigName = configName; }
// const std::string& GetProjectName() const { return mProjectName; }
const std::string& GetTopicName() const { return mTopicName; }
// const std::string& GetCategory() const { return mCategory; }
/// @return e.g. `/logtail_host/var/xxx/home/admin/access.log`,
const std::string& GetHostLogPath() const { return mHostLogPath; }
bool GetSymbolicLinkFlag() const { return mSymbolicLinkFlag; }
/// @return e.g. `/home/admin/access.log`
const std::string& GetConvertedPath() const;
const std::string& GetHostLogPathFile() const { return mHostLogPathFile; }
int64_t GetFileSize() const { return mLastFileSize; }
int64_t GetLastFilePos() const { return mLastFilePos; }
int32_t GetIdxInReaderArrayFromLastCpt() const { return mIdxInReaderArrayFromLastCpt; }
void SetIdxInReaderArrayFromLastCpt(int32_t idx) { mIdxInReaderArrayFromLastCpt = idx; }
void ResetLastFilePos() { mLastFilePos = 0; }
bool NeedSkipFirstModify() const { return mSkipFirstModify; }
void DisableSkipFirstModify() { mSkipFirstModify = false; }
void SetReadFromBeginning();
// fuse, 废弃
// bool SetReadPosForBackwardReading(LogFileOperator& op);
void SetLastFilePos(int64_t pos) {
if (pos > 0)
mFirstWatched = false;
mLastFilePos = pos;
}
void
InitReader(bool tailExisted = false, FileReadPolicy policy = BACKWARD_TO_FIXED_POS, uint32_t eoConcurrency = 0);
void DumpMetaToMem(bool checkConfigFlag = false, int32_t idxInReaderArray = CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY);
std::string GetSourceId() { return mSourceId; }
bool IsFileDeleted() const { return mFileDeleted; }
void SetFileDeleted(bool flag);
time_t GetDeletedTime() const { return mDeletedTime; }
bool IsContainerStopped() const { return mContainerStopped; }
void SetContainerStopped();
time_t GetContainerStoppedTime() const { return mContainerStoppedTime; }
std::string GetContainerID() const { return mContainerID; }
void SetContainerID(const std::string& containerID) { mContainerID = containerID; }
bool UpdateContainerInfo();
bool IsFileOpened() const { return mLogFileOp.IsOpen(); }
bool ShouldForceReleaseDeletedFileFd();
// void SetPluginFlag(bool flag) { mPluginFlag = flag; }
// bool GetPluginFlag() const { return mPluginFlag; }
void OnOpenFileError();
// if update file ptr return false, then we should delete this reader
bool UpdateFilePtr();
bool CloseTimeoutFilePtr(int32_t curTime);
bool CheckDevInode();
bool CheckFileSignatureAndOffset(bool isOpenOnUpdate);
void UpdateLogPath(const std::string& filePath) {
if (mHostLogPath == filePath) {
return;
}
mRealLogPath = filePath;
}
void SetSymbolicLinkFlag(bool flag) { mSymbolicLinkFlag = flag; }
void CloseFilePtr();
// void SetLogstoreKey(uint64_t logstoreKey) { mLogstoreKey = logstoreKey; }
// Return the key of queues into which next read data will push.
//
// For normal reader, there is only one key for all read data, just return it.
//
// For exactly once reader, there are N keys, N is named concurrency, each
// read data can only enter specified queue.
// When one of the concurrency is blocked, concurrencies after it will also be
// blocked. For example, N is 8, concurrency 0-7, concurrency 1 is blocked, then
// nex read of concurrency 2-7 will also be blocked.
uint64_t GetLogstoreKey() const;
void SetDevInode(const DevInode& devInode) { mDevInode = devInode; }
DevInode GetDevInode() const { return mDevInode; }
const std::string& GetRealLogPath() const { return mRealLogPath; }
// void SetTimeFormat(const std::string& timeFormat) { mTimeFormat = timeFormat; }
// std::string GetTimeFormat() const { return mTimeFormat; }
bool IsReadToEnd() const { return GetLastReadPos() == mLastFileSize; }
bool HasDataInCache() const { return mCache.size(); }
LogFileReaderPtrArray* GetReaderArray();
void SetReaderArray(LogFileReaderPtrArray* readerArray);
// // some Reader will overide these functions (eg. JsonLogFileReader)
// virtual bool ParseLogLine(StringView buffer,
// sls_logs::LogGroup& logGroup,
// ParseLogError& error,
// LogtailTime& lastLogLineTime,
// std::string& lastLogTimeStr,
// uint32_t& logGroupSize)
// = 0;
// virtual bool LogSplit(const char* buffer,
// int32_t size,
// int32_t& lineFeed,
// std::vector<StringView>& logIndex,
// std::vector<StringView>& discardIndex);
bool IsFromCheckPoint() { return mLastFileSignatureHash != 0 && mLastFileSignatureSize > (size_t)0; }
// void SetDelayAlarmBytes(int64_t value) { mReadDelayAlarmBytes = value; }
// int64_t GetPackId() { return ++mPackId; }
void SetDockerPath(const std::string& dockerBasePath, size_t dockerReplaceSize);
const std::vector<std::pair<std::string, std::string>>& GetTopicExtraTags() const { return mTopicExtraTags; }
const std::vector<std::pair<TagKey, std::string>>& GetContainerMetadatas() { return mContainerMetadatas; }
void SetContainerMetadatas(const std::vector<std::pair<TagKey, std::string>>& tags) { mContainerMetadatas = tags; }
const std::vector<std::pair<std::string, std::string>>& GetExtraTags() { return mContainerExtraTags; }
void SetContainerExtraTags(const std::vector<std::pair<std::string, std::string>>& tags) {
mContainerExtraTags = tags;
}
QueueKey GetQueueKey() const { return mReaderConfig.second->GetProcessQueueKey(); }
// void SetDelaySkipBytes(int64_t value) { mReadDelaySkipBytes = value; }
// void SetFuseMode(bool fusemode) { mIsFuseMode = fusemode; }
// bool GetFuseMode() const { return mIsFuseMode; }
// void SetMarkOffsetFlag(bool markOffsetFlag) { mMarkOffsetFlag = markOffsetFlag; }
// bool GetMarkOffsetFlag() const { return mMarkOffsetFlag; }
// void SetFuseTrimedFilename(const std::string& filename) { mFuseTrimedFilename = filename; }
// std::string GetFuseTrimedFilename() const { return mFuseTrimedFilename; }
void ResetTopic(const std::string& topicFormat);
// SetReadBufferSize set reader buffer size, which controls the max size of single log.
static void SetReadBufferSize(int32_t bufSize);
// void SetSpecifiedYear(int32_t y) { mSpecifiedYear = y; }
// void SetCloseUnusedInterval(int32_t interval) { mCloseUnusedInterval = interval; }
// void SetPreciseTimestampConfig(bool enabled, const std::string& key, TimeStampUnit unit) {
// mPreciseTimestampConfig.enabled = enabled;
// mPreciseTimestampConfig.key = key;
// mPreciseTimestampConfig.unit = unit;
// }
// void SetTzOffsetSecond(bool tzAdjust = false, int logTzOffsetSecond = 0) {
// if (tzAdjust) {
// mTzOffsetSecond = logTzOffsetSecond - GetLocalTimeZoneOffsetSecond();
// } else {
// mTzOffsetSecond = 0;
// }
// }
// void SetAdjustApsaraMicroTimezone(bool adjustApsaraMicroTimezone = false) {
// mAdjustApsaraMicroTimezone = adjustApsaraMicroTimezone;
// }
std::unique_ptr<Event> CreateFlushTimeoutEvent();
const std::string& GetProject() const { return mProject; }
const std::string& GetLogstore() const { return mLogstore; }
const std::string& GetRegion() const { return mRegion; }
const std::string& GetConfigName() const { return mConfigName; }
FileReaderOptions::InputType GetInputType() { return mReaderConfig.first->mInputType; }
void SetEventGroupMetaAndTag(PipelineEventGroup& group);
void SetMetrics();
void ReportMetrics(uint64_t readSize);
protected:
bool GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool tryRollback = true);
void ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback = true);
void ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback = true);
size_t
ReadFile(LogFileOperator& logFileOp, void* buf, size_t size, int64_t& offset, TruncateInfo** truncateInfo = NULL);
static int32_t ParseTime(const char* buffer, const std::string& timeFormat);
void SetFilePosBackwardToFixedPos(LogFileOperator& logFileOp);
bool CheckForFirstOpen(FileReadPolicy policy = BACKWARD_TO_FIXED_POS);
void FixLastFilePos(LogFileOperator& logFileOp, int64_t endOffset);
inline int64_t GetLastReadPos() const { // pos read but may not consumed, used for read needed
return mLastFilePos + mCache.size();
}
// std::string mRegion;
// std::string mCategory;
// std::string mConfigName;
std::string mHostLogPath;
std::string mHostLogPathDir;
std::string mHostLogPathFile;
std::string mRealLogPath; // real log path
bool mSymbolicLinkFlag = false;
std::string mSourceId;
// int32_t mTailLimit; // KB
uint64_t mLastFileSignatureHash = 0;
uint32_t mLastFileSignatureSize = 0;
int64_t mLastFilePos = 0; // pos read and consumed, used for next read begin
int64_t mLastFileSize = 0;
time_t mLastMTime = 0;
std::string mCache;
// >= 0: index of reader array, -1: new reader, -2: not in reader array
int32_t mIdxInReaderArrayFromLastCpt = CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY;
// std::string mProjectName;
std::string mTopicName;
time_t mLastUpdateTime;
// boost::regex* mLogBeginRegPtr;
// boost::regex* mLogContinueRegPtr;
// boost::regex* mLogEndRegPtr;
// int mReaderFlushTimeout;
bool mLastForceRead = false;
// FileEncoding mFileEncoding;
// bool mDiscardUnmatch;
// LogType mLogType;
DevInode mDevInode;
bool mFirstWatched = true;
bool mFileDeleted = false;
time_t mDeletedTime = 0;
bool mContainerStopped = false;
std::string mContainerID;
time_t mContainerStoppedTime = 0;
time_t mReadStoppedContainerAlarmTime = 0;
int32_t mReadDelayTime = 0;
bool mSkipFirstModify = false;
// int64_t mReadDelayAlarmBytes;
// bool mPluginFlag;
// int64_t mPackId;
// int64_t mReadDelaySkipBytes; // if <=0, discard it, default 0.
int32_t mLastEventTime; // last time when process modify event, updated in check file sig
// int32_t mSpecifiedYear;
// bool mIsFuseMode = false;
// bool mMarkOffsetFlag = false;
// std::string mTimeFormat; // for backward reading
LogFileOperator mLogFileOp; // encapsulate fuse & non-fuse mode
// std::string mFuseTrimedFilename;
LogFileReaderPtrArray* mReaderArray = nullptr;
// uint64_t mLogstoreKey;
// mHostLogPath is `/logtail_host/var/xxx/home/admin/access.log`,
// mDockerPath is `/home/admin/access.log`
// we should use mDockerPath to extract topic and set it to __tag__:__path__
std::string mDockerPath;
// tags
std::vector<std::pair<std::string, std::string>> mTopicExtraTags;
std::vector<std::pair<TagKey, std::string>> mContainerMetadatas;
std::vector<std::pair<std::string, std::string>> mContainerExtraTags;
// int32_t mCloseUnusedInterval;
// PreciseTimestampConfig mPreciseTimestampConfig;
// int32_t mTzOffsetSecond;
// bool mAdjustApsaraMicroTimezone;
FileReaderConfig mReaderConfig;
MultilineConfig mMultilineConfig;
FileTagConfig mTagConfig;
// int64_t mLogGroupKey = 0;
// since reader is destructed after the corresponding pipeline is removed, pipeline context used in destructor
// should be copied explicitly from context.
std::string mProject;
std::string mLogstore;
std::string mConfigName;
std::string mRegion;
MetricLabels mMetricLabels;
ReentrantMetricsRecordRef mMetricsRecordRef;
CounterPtr mOutEventsTotal;
CounterPtr mOutEventGroupsTotal;
CounterPtr mOutSizeBytes;
IntGaugePtr mSourceSizeBytes;
IntGaugePtr mSourceReadOffsetBytes;
private:
bool mHasReadContainerBom = false;
void checkContainerType();
void checkContainerType(LogFileOperator& op);
// Initialized when the exactly once feature is enabled.
struct ExactlyOnceOption {
std::string primaryCheckpointKey;
QueueKey fbKey;
PrimaryCheckpointPB primaryCheckpoint;
std::vector<RangeCheckpointPtr> rangeCheckpointPtrs;
// Checkpoint for current read, maybe an existing checkpoint for replay or
// new checkpoint.
RangeCheckpointPtr selectedCheckpoint;
// Uncommitted checkpoints to replay.
// Initialized by adjustParametersByRangeCheckpoints,
// Used by selectCheckpointToReplay.
std::deque<RangeCheckpointPtr> toReplayCheckpoints;
// Recovered from checkpoints.
int64_t lastComittedOffset = -1;
uint32_t concurrency = 8;
};
std::shared_ptr<ExactlyOnceOption> mEOOption;
// Select next checkpoint to recover from toReplayCheckpoints.
// Called before read.
//
// It will check if the checkpoint to replay matches current file, mismatch cases:
// - Mismatch between read offset and mLastFilePos.
// - Mismatch between read length and the length of available file content.
// If mismatch happened, all replay checkpoints will be dropped.
//
// @return nullptr if no more checkpoint need to be replayed.
RangeCheckpointPtr selectCheckpointToReplay();
// Adjust offset to skip hole for next checkpoint or last committed offset.
// Called after read if current checkpoint is from replay checkpoints.
//
// Skip hole cases (compare to mLastFilePos):
// 1. Next checkpoint's offset is bigger.
// 2. No more checkpoint, but the last committed offset is bigger.
void skipCheckpointRelayHole();
// Return the size to read for following read.
//
// Complete selected checkpoint means that it is a replay checkpoint, so
// next read size should be specified.
//
// @param fileEnd: file size, ie. tell(seek(end)).
// @param fromCpt: if the read size is recoveried from checkpoint, set it to true.
size_t getNextReadSize(int64_t fileEnd, bool& fromCpt);
LineInfo GetLastLine(StringView buffer, int32_t end, bool needSingleLine = false);
// Update current checkpoint's read offset and length after success read.
void setExactlyOnceCheckpointAfterRead(size_t readSize);
// Return primary key of current reader by combining meta.
//
// Conflict resolve: file signature will be stored in primary checkpoint.
std::string makePrimaryCheckpointKey() {
std::string key;
key.append(mReaderConfig.second->GetConfigName())
.append("-")
.append(mHostLogPath)
.append("-")
.append(std::to_string(mDevInode.dev))
.append("-")
.append(std::to_string(mDevInode.inode));
return key;
}
// Initialize exactly once related member variables.
void initExactlyOnce(uint32_t concurrency);
// Validate if the primary checkpoint is ok to use by signature.
//
// Signature can be find in three place:
// 1. Checkpoint v1: /tmp/logtail_check_point.
// 2. Checkpoint v2: checkpoint database.
// 3. Read file data and calculate.
//
// Validation procedure:
// 1. Sig is found in checkpoint v2, read file data to validate.
// 2. Sig is found in checkpoint v1, ignore the v2 checkpoint.
// 3. Sig not found: ignore the checkpoint v2. It is safe because sig
// will be written before first read.
//
// By the way, if the v2 checkpoint is valid, set mLastFileSignatureSize/Hash.
bool validatePrimaryCheckpoint(const PrimaryCheckpointPB& cpt);
// Adjust parameters for first read by range checkpoints.
//
// Includes:
// - mLastFilePos, GetLastReadPos
// - mFirstWatched
// - mEOOption->toReplayCheckpoints
// - mEOOption->lastComittedOffset
// These parameters will control the ReadLog.
//
// Replay Checkpoints Selection Algorithm
// 1. Iterate all range checkpoints to find all uncommitted checkpoints.
// 2. Iterate all range checkpoints to find the one with maximum read offset.
// 3. Update member variables:
// 3.1. nothing is found, use default value.
// 3.2. uncommitted checkpoints are found:
// - Sort them by read offset
// - Set them as toReplayCheckpoints
// - Set GetLastReadPos = mLastFilePos = ReadOffsetOfFirstCheckpoint
// - Set mFirstWatched = false
// - If maximum is found and the checkpoint is committed, use it to update
// the last committed offset
// 3.3. no minimum but maximum is found, it means all data in checkpoints have
// already been committed, so read should start from the position after it.
// - Set GetLastReadPos = mLastFilePos = ReadOffset(maxOffset) + ReadLength(maxOffset)
// - Set mFirstWatched = false
void adjustParametersByRangeCheckpoints();
// Update primary checkpoint when meta updated.
void updatePrimaryCheckpointSignature();
void updatePrimaryCheckpointRealPath();
void handleUnmatchLogs(const char* buffer,
int& multiBeginIndex,
int endIndex,
std::vector<StringView>& logIndex,
std::vector<StringView>& discardIndex);
#ifdef APSARA_UNIT_TEST_MAIN
friend class EventDispatcherTest;
friend class LogFileReaderUnittest;
friend class LogMultiBytesUnittest;
friend class ExactlyOnceReaderUnittest;
friend class SenderUnittest;
friend class AppConfigUnittest;
friend class ModifyHandlerUnittest;
friend class LogSplitUnittest;
friend class LogSplitDiscardUnmatchUnittest;
friend class LogSplitNoDiscardUnmatchUnittest;
friend class RemoveLastIncompleteLogMultilineUnittest;
friend class LogFileReaderCheckpointUnittest;
friend class GetLastLineUnittest;
friend class LastMatchedContainerdTextLineUnittest;
friend class LastMatchedDockerJsonFileUnittest;
friend class LastMatchedContainerdTextWithDockerJsonUnittest;
friend class ForceReadUnittest;
friend class FileTagUnittest;
protected:
void UpdateReaderManual();
#endif
};
struct LogBuffer {
StringView rawBuffer;
LogFileReaderPtr logFileReader;
FileInfoPtr fileInfo;
TruncateInfoPtr truncateInfo;
// Current buffer's checkpoint, for exactly once feature.
RangeCheckpointPtr exactlyOnceCheckpoint;
// Current buffer's offset in file, for log position meta feature.
uint64_t readOffset = 0;
uint64_t readLength = 0;
std::unique_ptr<SourceBuffer> sourcebuffer;
LogBuffer() : sourcebuffer(new SourceBuffer()) {}
void SetDependecy(const LogFileReaderPtr& reader) { logFileReader = reader; }
};
} // namespace logtail