in core/file_server/reader/LogFileReader.cpp [1654:1827]
void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback) {
std::unique_ptr<char[]> gbkMemory;
char* gbkBuffer = nullptr;
size_t readCharCount = 0, originReadCount = 0;
int64_t lastReadPos = 0;
bool logTooLongSplitFlag = false, fromCpt = false;
bool allowRollback = true;
logBuffer.readOffset = mLastFilePos;
if (!mLogFileOp.IsOpen()) {
// read flush timeout
readCharCount = mCache.size();
gbkMemory.reset(new char[readCharCount + 1]);
gbkBuffer = gbkMemory.get();
memcpy(gbkBuffer, mCache.data(), readCharCount);
// Ignore \n if last is force read
if (gbkBuffer[0] == '\n' && mLastForceRead) {
++gbkBuffer;
++mLastFilePos;
logBuffer.readOffset = mLastFilePos;
--readCharCount;
}
mLastForceRead = true;
allowRollback = false;
lastReadPos = mLastFilePos + readCharCount;
originReadCount = readCharCount;
moreData = false;
} else {
size_t READ_BYTE = getNextReadSize(end, fromCpt);
const size_t lastCacheSize = mCache.size();
if (READ_BYTE < lastCacheSize) {
READ_BYTE = lastCacheSize; // this should not happen, just avoid READ_BYTE >= 0 theoratically
}
gbkMemory.reset(new char[READ_BYTE + 1]);
gbkBuffer = gbkMemory.get();
if (lastCacheSize) {
READ_BYTE -= lastCacheSize; // reserve space to copy from cache if needed
}
TruncateInfo* truncateInfo = nullptr;
lastReadPos = GetLastReadPos();
readCharCount
= READ_BYTE ? ReadFile(mLogFileOp, gbkBuffer + lastCacheSize, READ_BYTE, lastReadPos, &truncateInfo) : 0UL;
// Only when there is no new log and not try rollback, then force read
if (!tryRollback && readCharCount == 0) {
allowRollback = false;
}
if (readCharCount == 0 && (!lastCacheSize || allowRollback)) { // just keep last cache
return;
}
if (mReaderConfig.first->mInputType == FileReaderOptions::InputType::InputContainerStdio
&& !mHasReadContainerBom) {
checkContainerType(mLogFileOp);
}
if (lastCacheSize) {
memcpy(gbkBuffer, mCache.data(), lastCacheSize); // copy from cache
readCharCount += lastCacheSize;
}
// Ignore \n if last is force read
if (gbkBuffer[0] == '\n' && mLastForceRead) {
++gbkBuffer;
--readCharCount;
++mLastFilePos;
logBuffer.readOffset = mLastFilePos;
}
mLastForceRead = !allowRollback;
logBuffer.truncateInfo.reset(truncateInfo);
lastReadPos = mLastFilePos + readCharCount;
originReadCount = readCharCount;
moreData = (readCharCount == BUFFER_SIZE);
auto alignedBytes = readCharCount;
if (allowRollback) {
alignedBytes = AlignLastCharacter(gbkBuffer, readCharCount);
}
if (alignedBytes == 0) {
if (moreData) { // excessively long line without valid wchar
logTooLongSplitFlag = true;
alignedBytes = BUFFER_SIZE;
} else {
// line is not finished yet nor more data, put all data in cache
mCache.assign(gbkBuffer, originReadCount);
return;
}
}
readCharCount = alignedBytes;
}
gbkBuffer[readCharCount] = '\0';
vector<long> lineFeedPos = {-1}; // elements point to the last char of each line
for (long idx = 0; idx < long(readCharCount - 1); ++idx) {
if (gbkBuffer[idx] == '\n')
lineFeedPos.push_back(idx);
}
lineFeedPos.push_back(readCharCount - 1);
size_t srcLength = readCharCount;
size_t requiredLen
= EncodingConverter::GetInstance()->ConvertGbk2Utf8(gbkBuffer, &srcLength, nullptr, 0, lineFeedPos);
StringBuffer stringMemory = logBuffer.sourcebuffer->AllocateStringBuffer(requiredLen + 1);
size_t resultCharCount = EncodingConverter::GetInstance()->ConvertGbk2Utf8(
gbkBuffer, &srcLength, stringMemory.data, stringMemory.capacity, lineFeedPos);
char* stringBuffer = stringMemory.data; // utf8 buffer
if (resultCharCount == 0) {
if (readCharCount < originReadCount) {
// skip unconvertable part, put rollbacked part in cache
mCache.assign(gbkBuffer + readCharCount, originReadCount - readCharCount);
} else {
mCache.clear();
}
mLastFilePos += readCharCount;
logBuffer.readOffset = mLastFilePos;
return;
}
int32_t rollbackLineFeedCount = 0;
int32_t bakResultCharCount = resultCharCount;
if (allowRollback || mReaderConfig.second->RequiringJsonReader()) {
resultCharCount = RemoveLastIncompleteLog(stringBuffer, resultCharCount, rollbackLineFeedCount, allowRollback);
}
if (resultCharCount == 0) {
if (moreData) {
resultCharCount = bakResultCharCount;
rollbackLineFeedCount = 0;
if (mReaderConfig.second->RequiringJsonReader()) {
int32_t rollbackLineFeedCount;
RemoveLastIncompleteLog(stringBuffer, resultCharCount, rollbackLineFeedCount, false);
}
// Cannot get the split position here, so just mark a flag and send alarm later
logTooLongSplitFlag = true;
} else {
// line is not finished yet nor more data, put all data in cache
mCache.assign(gbkBuffer, originReadCount);
return;
}
}
int32_t lineFeedCount = lineFeedPos.size();
if (rollbackLineFeedCount > 0 && lineFeedCount >= (1 + rollbackLineFeedCount)) {
readCharCount -= lineFeedPos[lineFeedCount - 1] - lineFeedPos[lineFeedCount - 1 - rollbackLineFeedCount];
}
if (readCharCount < originReadCount) {
// rollback happend, put rollbacked part in cache
mCache.assign(gbkBuffer + readCharCount, originReadCount - readCharCount);
} else {
mCache.clear();
}
// cache is sealed, readCharCount should not change any more
size_t stringLen = resultCharCount;
if (stringLen > 0
&& (stringBuffer[stringLen - 1] == '\n'
|| stringBuffer[stringLen - 1]
== '\0')) { // \0 is for json, such behavior make ilogtail not able to collect binary log
--stringLen;
}
stringBuffer[stringLen] = '\0';
if (mLogFileOp.IsOpen() && !moreData && fromCpt && lastReadPos < end) {
moreData = true;
}
logBuffer.rawBuffer = StringView(stringBuffer, stringLen);
logBuffer.readLength = readCharCount;
setExactlyOnceCheckpointAfterRead(readCharCount);
mLastFilePos += readCharCount;
if (logTooLongSplitFlag) {
LOG_WARNING(sLogger,
("Log is too long and forced to be split at offset: ", mLastFilePos)("file: ", mHostLogPath)(
"inode: ", mDevInode.inode)("first 1024B log: ", logBuffer.rawBuffer.substr(0, 1024)));
std::ostringstream oss;
oss << "Log is too long and forced to be split at offset: " << ToString(mLastFilePos)
<< " file: " << mHostLogPath << " inode: " << ToString(mDevInode.inode)
<< " first 1024B log: " << logBuffer.rawBuffer.substr(0, 1024) << std::endl;
AlarmManager::GetInstance()->SendAlarm(
SPLIT_LOG_FAIL_ALARM, oss.str(), GetRegion(), GetProject(), GetConfigName(), GetLogstore());
}
LOG_DEBUG(sLogger,
("read gbk buffer, offset", mLastFilePos)("origin read", originReadCount)("at last read", readCharCount));
}