func()

in pkg/wal/wal.go [705:827]


func (segment *segment) parseLogEntries() error {
	segmentBytes, err := os.ReadFile(segment.path)
	if err != nil {
		return errors.Wrap(err, "Read WAL segment failed, path: "+segment.path)
	}

	var logEntries []LogEntry
	var data []byte
	var batchLen uint64
	var entryLen uint64
	var seriesIDLen uint16
	var seriesID common.GlobalSeriesID
	var seriesCount uint32
	var timestampsBinaryLen uint16
	var entryEndPosition uint64

	var oldPos uint64
	var pos uint64
	parseNextBatchFlag := true
	segmentBytesLen := uint64(len(segmentBytes))

	for {
		if parseNextBatchFlag {
			if segmentBytesLen <= batchLength {
				break
			}
			data = segmentBytes[pos : pos+batchLength]
			batchLen, err = segment.parseBatchLength(data)
			if err != nil {
				return errors.Wrap(err, "Parse batch length error")
			}

			if segmentBytesLen <= batchLen {
				break
			}

			pos += batchLength
			oldPos = pos
			parseNextBatchFlag = false
		}

		// Parse entryLength.
		data = segmentBytes[pos : pos+entryLength]

		entryLen, err = segment.parseEntryLength(data)
		if err != nil {
			return errors.Wrap(err, "Parse entry length error")
		}
		pos += entryLength

		// Mark entry end-position
		entryEndPosition = pos + entryLen
		if segmentBytesLen < entryEndPosition {
			break
		}

		// Parse seriesIDLength.
		data = segmentBytes[pos : pos+seriesIDLength]
		seriesIDLen, err = segment.parseSeriesIDLength(data)
		if err != nil {
			return errors.Wrap(err, "Parse seriesID length error")
		}
		pos += seriesIDLength

		// Parse seriesID.
		data = segmentBytes[pos : pos+uint64(seriesIDLen)]
		seriesID = segment.parseSeriesID(data)
		pos += uint64(seriesIDLen)

		// Parse series count.
		data = segmentBytes[pos : pos+seriesCountLength]
		seriesCount, err = segment.parseSeriesCountLength(data)
		if err != nil {
			return errors.Wrap(err, "Parse series count length error")
		}
		pos += seriesCountLength

		// Parse timestamps compression binary.
		data = segmentBytes[pos : pos+timestampsBinaryLength]
		timestampsBinaryLen, err = segment.parseTimestampsLength(data)
		if err != nil {
			return errors.Wrap(err, "Parse timestamps length error")
		}
		pos += timestampsBinaryLength
		data = segmentBytes[pos : pos+uint64(timestampsBinaryLen)]
		var timestamps []time.Time
		timestamps, err = segment.parseTimestamps(seriesCount, data)
		if err != nil {
			return errors.Wrap(err, "Parse timestamps compression binary error")
		}
		pos += uint64(timestampsBinaryLen)

		// Parse values compression binary.
		data = segmentBytes[pos:entryEndPosition]
		values, err := segment.parseValuesBinary(data)
		if err != nil {
			return errors.Wrap(err, "Parse values compression binary error")
		}
		if values.Len() != int(seriesCount) {
			return errors.New("values binary items not match series count. series count: " +
				strconv.Itoa(int(seriesCount)) + ", values binary items: " + strconv.Itoa(values.Len()))
		}
		pos = entryEndPosition

		logEntry := &logEntry{
			entryLength: entryLen,
			seriesID:    seriesID,
			count:       seriesCount,
			timestamps:  timestamps,
			values:      values,
		}
		logEntries = append(logEntries, logEntry)

		if pos == segmentBytesLen {
			break
		}
		if pos-oldPos == batchLen {
			parseNextBatchFlag = true
		}
	}
	segment.logEntries = logEntries
	return nil
}