in core/file_server/reader/LogFileReader.cpp [1521:1652]
void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback) {
char* stringBuffer = nullptr;
size_t nbytes = 0;
logBuffer.readOffset = mLastFilePos;
if (!mLogFileOp.IsOpen()) {
// read flush timeout
nbytes = mCache.size();
StringBuffer stringMemory = logBuffer.sourcebuffer->AllocateStringBuffer(nbytes);
stringBuffer = stringMemory.data;
memcpy(stringBuffer, mCache.data(), nbytes);
// Ignore \n if last is force read
if (stringBuffer[0] == '\n' && mLastForceRead) {
++stringBuffer;
++mLastFilePos;
logBuffer.readOffset = mLastFilePos;
--nbytes;
}
mLastForceRead = true;
mCache.clear();
moreData = false;
} else {
bool fromCpt = false;
size_t READ_BYTE = getNextReadSize(end, fromCpt);
if (!READ_BYTE) {
return;
}
if (mReaderConfig.first->mInputType == FileReaderOptions::InputType::InputContainerStdio
&& !mHasReadContainerBom) {
checkContainerType(mLogFileOp);
}
const size_t lastCacheSize = mCache.size();
if (READ_BYTE < lastCacheSize) {
READ_BYTE = lastCacheSize; // this should not happen, just avoid READ_BYTE >= 0 theoratically
}
StringBuffer stringMemory
= logBuffer.sourcebuffer->AllocateStringBuffer(READ_BYTE); // allocate modifiable buffer
if (lastCacheSize) {
READ_BYTE -= lastCacheSize; // reserve space to copy from cache if needed
}
TruncateInfo* truncateInfo = nullptr;
int64_t lastReadPos = GetLastReadPos();
nbytes = READ_BYTE
? ReadFile(mLogFileOp, stringMemory.data + lastCacheSize, READ_BYTE, lastReadPos, &truncateInfo)
: 0UL;
stringBuffer = stringMemory.data;
bool allowRollback = true;
// Only when there is no new log and not try rollback, then force read
if (!tryRollback && nbytes == 0) {
allowRollback = false;
}
if (nbytes == 0 && (!lastCacheSize || allowRollback)) { // read nothing, if no cached data or allow rollback the
// reader's state cannot be changed
return;
}
if (lastCacheSize) {
memcpy(stringBuffer, mCache.data(), lastCacheSize); // copy from cache
nbytes += lastCacheSize;
}
// Ignore \n if last is force read
if (stringBuffer[0] == '\n' && mLastForceRead) {
++stringBuffer;
++mLastFilePos;
logBuffer.readOffset = mLastFilePos;
--nbytes;
}
mLastForceRead = !allowRollback;
const size_t stringBufferLen = nbytes;
logBuffer.truncateInfo.reset(truncateInfo);
lastReadPos = mLastFilePos + nbytes; // this doesn't seem right when ulogfs is used and a hole is skipped
LOG_DEBUG(sLogger, ("read bytes", nbytes)("last read pos", lastReadPos));
moreData = (nbytes == BUFFER_SIZE);
auto alignedBytes = nbytes;
if (allowRollback) {
alignedBytes = AlignLastCharacter(stringBuffer, nbytes);
}
if (allowRollback || mReaderConfig.second->RequiringJsonReader()) {
int32_t rollbackLineFeedCount;
nbytes = RemoveLastIncompleteLog(stringBuffer, alignedBytes, rollbackLineFeedCount, allowRollback);
}
if (nbytes == 0) {
if (moreData) { // excessively long line without '\n' or multiline begin or valid wchar
nbytes = alignedBytes ? alignedBytes : BUFFER_SIZE;
if (mReaderConfig.second->RequiringJsonReader()) {
int32_t rollbackLineFeedCount;
nbytes = RemoveLastIncompleteLog(stringBuffer, nbytes, rollbackLineFeedCount, false);
}
LOG_WARNING(sLogger,
("Log is too long and forced to be split at offset: ",
mLastFilePos + nbytes)("file: ", mHostLogPath)("inode: ", mDevInode.inode)(
"first 1024B log: ", logBuffer.rawBuffer.substr(0, 1024)));
std::ostringstream oss;
oss << "Log is too long and forced to be split at offset: " << ToString(mLastFilePos + nbytes)
<< " file: " << mHostLogPath << " inode: " << ToString(mDevInode.inode)
<< " first 1024B log: " << logBuffer.rawBuffer.substr(0, 1024) << std::endl;
AlarmManager::GetInstance()->SendAlarm(
SPLIT_LOG_FAIL_ALARM, oss.str(), GetRegion(), GetProject(), GetConfigName(), GetLogstore());
} else {
// line is not finished yet nor more data, put all data in cache
mCache.assign(stringBuffer, stringBufferLen);
return;
}
}
if (nbytes < stringBufferLen) {
// rollback happend, put rollbacked part in cache
mCache.assign(stringBuffer + nbytes, stringBufferLen - nbytes);
} else {
mCache.clear();
}
if (!moreData && fromCpt && lastReadPos < end) {
moreData = true;
}
}
// cache is sealed, nbytes should no change any more
size_t stringLen = nbytes;
if (stringLen > 0
&& (stringBuffer[stringLen - 1] == '\n'
|| stringBuffer[stringLen - 1]
== '\0')) { // \0 is for json, such behavior make ilogtail not able to collect binary log
--stringLen;
}
stringBuffer[stringLen] = '\0';
logBuffer.rawBuffer = StringView(stringBuffer, stringLen); // set readable buffer
logBuffer.readLength = nbytes;
setExactlyOnceCheckpointAfterRead(nbytes);
mLastFilePos += nbytes;
LOG_DEBUG(sLogger, ("read size", nbytes)("last file pos", mLastFilePos));
}