void LogFileReader::ReadUTF8()

in core/file_server/reader/LogFileReader.cpp [1521:1652]


void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback) {
    char* stringBuffer = nullptr;
    size_t nbytes = 0;

    logBuffer.readOffset = mLastFilePos;
    if (!mLogFileOp.IsOpen()) {
        // read flush timeout
        nbytes = mCache.size();
        StringBuffer stringMemory = logBuffer.sourcebuffer->AllocateStringBuffer(nbytes);
        stringBuffer = stringMemory.data;
        memcpy(stringBuffer, mCache.data(), nbytes);
        // Ignore \n if last is force read
        if (stringBuffer[0] == '\n' && mLastForceRead) {
            ++stringBuffer;
            ++mLastFilePos;
            logBuffer.readOffset = mLastFilePos;
            --nbytes;
        }
        mLastForceRead = true;
        mCache.clear();
        moreData = false;
    } else {
        bool fromCpt = false;
        size_t READ_BYTE = getNextReadSize(end, fromCpt);
        if (!READ_BYTE) {
            return;
        }
        if (mReaderConfig.first->mInputType == FileReaderOptions::InputType::InputContainerStdio
            && !mHasReadContainerBom) {
            checkContainerType(mLogFileOp);
        }
        const size_t lastCacheSize = mCache.size();
        if (READ_BYTE < lastCacheSize) {
            READ_BYTE = lastCacheSize; // this should not happen, just avoid READ_BYTE >= 0 theoratically
        }
        StringBuffer stringMemory
            = logBuffer.sourcebuffer->AllocateStringBuffer(READ_BYTE); // allocate modifiable buffer
        if (lastCacheSize) {
            READ_BYTE -= lastCacheSize; // reserve space to copy from cache if needed
        }
        TruncateInfo* truncateInfo = nullptr;
        int64_t lastReadPos = GetLastReadPos();
        nbytes = READ_BYTE
            ? ReadFile(mLogFileOp, stringMemory.data + lastCacheSize, READ_BYTE, lastReadPos, &truncateInfo)
            : 0UL;
        stringBuffer = stringMemory.data;
        bool allowRollback = true;
        // Only when there is no new log and not try rollback, then force read
        if (!tryRollback && nbytes == 0) {
            allowRollback = false;
        }
        if (nbytes == 0 && (!lastCacheSize || allowRollback)) { // read nothing, if no cached data or allow rollback the
            // reader's state cannot be changed
            return;
        }
        if (lastCacheSize) {
            memcpy(stringBuffer, mCache.data(), lastCacheSize); // copy from cache
            nbytes += lastCacheSize;
        }
        // Ignore \n if last is force read
        if (stringBuffer[0] == '\n' && mLastForceRead) {
            ++stringBuffer;
            ++mLastFilePos;
            logBuffer.readOffset = mLastFilePos;
            --nbytes;
        }
        mLastForceRead = !allowRollback;
        const size_t stringBufferLen = nbytes;
        logBuffer.truncateInfo.reset(truncateInfo);
        lastReadPos = mLastFilePos + nbytes; // this doesn't seem right when ulogfs is used and a hole is skipped
        LOG_DEBUG(sLogger, ("read bytes", nbytes)("last read pos", lastReadPos));
        moreData = (nbytes == BUFFER_SIZE);
        auto alignedBytes = nbytes;
        if (allowRollback) {
            alignedBytes = AlignLastCharacter(stringBuffer, nbytes);
        }
        if (allowRollback || mReaderConfig.second->RequiringJsonReader()) {
            int32_t rollbackLineFeedCount;
            nbytes = RemoveLastIncompleteLog(stringBuffer, alignedBytes, rollbackLineFeedCount, allowRollback);
        }

        if (nbytes == 0) {
            if (moreData) { // excessively long line without '\n' or multiline begin or valid wchar
                nbytes = alignedBytes ? alignedBytes : BUFFER_SIZE;
                if (mReaderConfig.second->RequiringJsonReader()) {
                    int32_t rollbackLineFeedCount;
                    nbytes = RemoveLastIncompleteLog(stringBuffer, nbytes, rollbackLineFeedCount, false);
                }
                LOG_WARNING(sLogger,
                            ("Log is too long and forced to be split at offset: ",
                             mLastFilePos + nbytes)("file: ", mHostLogPath)("inode: ", mDevInode.inode)(
                                "first 1024B log: ", logBuffer.rawBuffer.substr(0, 1024)));
                std::ostringstream oss;
                oss << "Log is too long and forced to be split at offset: " << ToString(mLastFilePos + nbytes)
                    << " file: " << mHostLogPath << " inode: " << ToString(mDevInode.inode)
                    << " first 1024B log: " << logBuffer.rawBuffer.substr(0, 1024) << std::endl;
                AlarmManager::GetInstance()->SendAlarm(
                    SPLIT_LOG_FAIL_ALARM, oss.str(), GetRegion(), GetProject(), GetConfigName(), GetLogstore());
            } else {
                // line is not finished yet nor more data, put all data in cache
                mCache.assign(stringBuffer, stringBufferLen);
                return;
            }
        }
        if (nbytes < stringBufferLen) {
            // rollback happend, put rollbacked part in cache
            mCache.assign(stringBuffer + nbytes, stringBufferLen - nbytes);
        } else {
            mCache.clear();
        }
        if (!moreData && fromCpt && lastReadPos < end) {
            moreData = true;
        }
    }

    // cache is sealed, nbytes should no change any more
    size_t stringLen = nbytes;
    if (stringLen > 0
        && (stringBuffer[stringLen - 1] == '\n'
            || stringBuffer[stringLen - 1]
                == '\0')) { // \0 is for json, such behavior make ilogtail not able to collect binary log
        --stringLen;
    }
    stringBuffer[stringLen] = '\0';

    logBuffer.rawBuffer = StringView(stringBuffer, stringLen); // set readable buffer
    logBuffer.readLength = nbytes;
    setExactlyOnceCheckpointAfterRead(nbytes);
    mLastFilePos += nbytes;

    LOG_DEBUG(sLogger, ("read size", nbytes)("last file pos", mLastFilePos));
}