void LogFileReader::ReadGBK()

in core/file_server/reader/LogFileReader.cpp [1654:1827]


void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback) {
    std::unique_ptr<char[]> gbkMemory;
    char* gbkBuffer = nullptr;
    size_t readCharCount = 0, originReadCount = 0;
    int64_t lastReadPos = 0;
    bool logTooLongSplitFlag = false, fromCpt = false;
    bool allowRollback = true;

    logBuffer.readOffset = mLastFilePos;
    if (!mLogFileOp.IsOpen()) {
        // read flush timeout
        readCharCount = mCache.size();
        gbkMemory.reset(new char[readCharCount + 1]);
        gbkBuffer = gbkMemory.get();
        memcpy(gbkBuffer, mCache.data(), readCharCount);
        // Ignore \n if last is force read
        if (gbkBuffer[0] == '\n' && mLastForceRead) {
            ++gbkBuffer;
            ++mLastFilePos;
            logBuffer.readOffset = mLastFilePos;
            --readCharCount;
        }
        mLastForceRead = true;
        allowRollback = false;
        lastReadPos = mLastFilePos + readCharCount;
        originReadCount = readCharCount;
        moreData = false;
    } else {
        size_t READ_BYTE = getNextReadSize(end, fromCpt);
        const size_t lastCacheSize = mCache.size();
        if (READ_BYTE < lastCacheSize) {
            READ_BYTE = lastCacheSize; // this should not happen, just avoid READ_BYTE >= 0 theoratically
        }
        gbkMemory.reset(new char[READ_BYTE + 1]);
        gbkBuffer = gbkMemory.get();
        if (lastCacheSize) {
            READ_BYTE -= lastCacheSize; // reserve space to copy from cache if needed
        }
        TruncateInfo* truncateInfo = nullptr;
        lastReadPos = GetLastReadPos();
        readCharCount
            = READ_BYTE ? ReadFile(mLogFileOp, gbkBuffer + lastCacheSize, READ_BYTE, lastReadPos, &truncateInfo) : 0UL;
        // Only when there is no new log and not try rollback, then force read
        if (!tryRollback && readCharCount == 0) {
            allowRollback = false;
        }
        if (readCharCount == 0 && (!lastCacheSize || allowRollback)) { // just keep last cache
            return;
        }
        if (mReaderConfig.first->mInputType == FileReaderOptions::InputType::InputContainerStdio
            && !mHasReadContainerBom) {
            checkContainerType(mLogFileOp);
        }
        if (lastCacheSize) {
            memcpy(gbkBuffer, mCache.data(), lastCacheSize); // copy from cache
            readCharCount += lastCacheSize;
        }
        // Ignore \n if last is force read
        if (gbkBuffer[0] == '\n' && mLastForceRead) {
            ++gbkBuffer;
            --readCharCount;
            ++mLastFilePos;
            logBuffer.readOffset = mLastFilePos;
        }
        mLastForceRead = !allowRollback;
        logBuffer.truncateInfo.reset(truncateInfo);
        lastReadPos = mLastFilePos + readCharCount;
        originReadCount = readCharCount;
        moreData = (readCharCount == BUFFER_SIZE);
        auto alignedBytes = readCharCount;
        if (allowRollback) {
            alignedBytes = AlignLastCharacter(gbkBuffer, readCharCount);
        }
        if (alignedBytes == 0) {
            if (moreData) { // excessively long line without valid wchar
                logTooLongSplitFlag = true;
                alignedBytes = BUFFER_SIZE;
            } else {
                // line is not finished yet nor more data, put all data in cache
                mCache.assign(gbkBuffer, originReadCount);
                return;
            }
        }
        readCharCount = alignedBytes;
    }
    gbkBuffer[readCharCount] = '\0';

    vector<long> lineFeedPos = {-1}; // elements point to the last char of each line
    for (long idx = 0; idx < long(readCharCount - 1); ++idx) {
        if (gbkBuffer[idx] == '\n')
            lineFeedPos.push_back(idx);
    }
    lineFeedPos.push_back(readCharCount - 1);

    size_t srcLength = readCharCount;
    size_t requiredLen
        = EncodingConverter::GetInstance()->ConvertGbk2Utf8(gbkBuffer, &srcLength, nullptr, 0, lineFeedPos);
    StringBuffer stringMemory = logBuffer.sourcebuffer->AllocateStringBuffer(requiredLen + 1);
    size_t resultCharCount = EncodingConverter::GetInstance()->ConvertGbk2Utf8(
        gbkBuffer, &srcLength, stringMemory.data, stringMemory.capacity, lineFeedPos);
    char* stringBuffer = stringMemory.data; // utf8 buffer
    if (resultCharCount == 0) {
        if (readCharCount < originReadCount) {
            // skip unconvertable part, put rollbacked part in cache
            mCache.assign(gbkBuffer + readCharCount, originReadCount - readCharCount);
        } else {
            mCache.clear();
        }
        mLastFilePos += readCharCount;
        logBuffer.readOffset = mLastFilePos;
        return;
    }
    int32_t rollbackLineFeedCount = 0;
    int32_t bakResultCharCount = resultCharCount;
    if (allowRollback || mReaderConfig.second->RequiringJsonReader()) {
        resultCharCount = RemoveLastIncompleteLog(stringBuffer, resultCharCount, rollbackLineFeedCount, allowRollback);
    }
    if (resultCharCount == 0) {
        if (moreData) {
            resultCharCount = bakResultCharCount;
            rollbackLineFeedCount = 0;
            if (mReaderConfig.second->RequiringJsonReader()) {
                int32_t rollbackLineFeedCount;
                RemoveLastIncompleteLog(stringBuffer, resultCharCount, rollbackLineFeedCount, false);
            }
            // Cannot get the split position here, so just mark a flag and send alarm later
            logTooLongSplitFlag = true;
        } else {
            // line is not finished yet nor more data, put all data in cache
            mCache.assign(gbkBuffer, originReadCount);
            return;
        }
    }

    int32_t lineFeedCount = lineFeedPos.size();
    if (rollbackLineFeedCount > 0 && lineFeedCount >= (1 + rollbackLineFeedCount)) {
        readCharCount -= lineFeedPos[lineFeedCount - 1] - lineFeedPos[lineFeedCount - 1 - rollbackLineFeedCount];
    }
    if (readCharCount < originReadCount) {
        // rollback happend, put rollbacked part in cache
        mCache.assign(gbkBuffer + readCharCount, originReadCount - readCharCount);
    } else {
        mCache.clear();
    }
    // cache is sealed, readCharCount should not change any more
    size_t stringLen = resultCharCount;
    if (stringLen > 0
        && (stringBuffer[stringLen - 1] == '\n'
            || stringBuffer[stringLen - 1]
                == '\0')) { // \0 is for json, such behavior make ilogtail not able to collect binary log
        --stringLen;
    }
    stringBuffer[stringLen] = '\0';
    if (mLogFileOp.IsOpen() && !moreData && fromCpt && lastReadPos < end) {
        moreData = true;
    }
    logBuffer.rawBuffer = StringView(stringBuffer, stringLen);
    logBuffer.readLength = readCharCount;
    setExactlyOnceCheckpointAfterRead(readCharCount);
    mLastFilePos += readCharCount;
    if (logTooLongSplitFlag) {
        LOG_WARNING(sLogger,
                    ("Log is too long and forced to be split at offset: ", mLastFilePos)("file: ", mHostLogPath)(
                        "inode: ", mDevInode.inode)("first 1024B log: ", logBuffer.rawBuffer.substr(0, 1024)));
        std::ostringstream oss;
        oss << "Log is too long and forced to be split at offset: " << ToString(mLastFilePos)
            << " file: " << mHostLogPath << " inode: " << ToString(mDevInode.inode)
            << " first 1024B log: " << logBuffer.rawBuffer.substr(0, 1024) << std::endl;
        AlarmManager::GetInstance()->SendAlarm(
            SPLIT_LOG_FAIL_ALARM, oss.str(), GetRegion(), GetProject(), GetConfigName(), GetLogstore());
    }
    LOG_DEBUG(sLogger,
              ("read gbk buffer, offset", mLastFilePos)("origin read", originReadCount)("at last read", readCharCount));
}