in pkg/wal/wal.go [705:827]
func (segment *segment) parseLogEntries() error {
segmentBytes, err := os.ReadFile(segment.path)
if err != nil {
return errors.Wrap(err, "Read WAL segment failed, path: "+segment.path)
}
var logEntries []LogEntry
var data []byte
var batchLen uint64
var entryLen uint64
var seriesIDLen uint16
var seriesID common.GlobalSeriesID
var seriesCount uint32
var timestampsBinaryLen uint16
var entryEndPosition uint64
var oldPos uint64
var pos uint64
parseNextBatchFlag := true
segmentBytesLen := uint64(len(segmentBytes))
for {
if parseNextBatchFlag {
if segmentBytesLen <= batchLength {
break
}
data = segmentBytes[pos : pos+batchLength]
batchLen, err = segment.parseBatchLength(data)
if err != nil {
return errors.Wrap(err, "Parse batch length error")
}
if segmentBytesLen <= batchLen {
break
}
pos += batchLength
oldPos = pos
parseNextBatchFlag = false
}
// Parse entryLength.
data = segmentBytes[pos : pos+entryLength]
entryLen, err = segment.parseEntryLength(data)
if err != nil {
return errors.Wrap(err, "Parse entry length error")
}
pos += entryLength
// Mark entry end-position
entryEndPosition = pos + entryLen
if segmentBytesLen < entryEndPosition {
break
}
// Parse seriesIDLength.
data = segmentBytes[pos : pos+seriesIDLength]
seriesIDLen, err = segment.parseSeriesIDLength(data)
if err != nil {
return errors.Wrap(err, "Parse seriesID length error")
}
pos += seriesIDLength
// Parse seriesID.
data = segmentBytes[pos : pos+uint64(seriesIDLen)]
seriesID = segment.parseSeriesID(data)
pos += uint64(seriesIDLen)
// Parse series count.
data = segmentBytes[pos : pos+seriesCountLength]
seriesCount, err = segment.parseSeriesCountLength(data)
if err != nil {
return errors.Wrap(err, "Parse series count length error")
}
pos += seriesCountLength
// Parse timestamps compression binary.
data = segmentBytes[pos : pos+timestampsBinaryLength]
timestampsBinaryLen, err = segment.parseTimestampsLength(data)
if err != nil {
return errors.Wrap(err, "Parse timestamps length error")
}
pos += timestampsBinaryLength
data = segmentBytes[pos : pos+uint64(timestampsBinaryLen)]
var timestamps []time.Time
timestamps, err = segment.parseTimestamps(seriesCount, data)
if err != nil {
return errors.Wrap(err, "Parse timestamps compression binary error")
}
pos += uint64(timestampsBinaryLen)
// Parse values compression binary.
data = segmentBytes[pos:entryEndPosition]
values, err := segment.parseValuesBinary(data)
if err != nil {
return errors.Wrap(err, "Parse values compression binary error")
}
if values.Len() != int(seriesCount) {
return errors.New("values binary items not match series count. series count: " +
strconv.Itoa(int(seriesCount)) + ", values binary items: " + strconv.Itoa(values.Len()))
}
pos = entryEndPosition
logEntry := &logEntry{
entryLength: entryLen,
seriesID: seriesID,
count: seriesCount,
timestamps: timestamps,
values: values,
}
logEntries = append(logEntries, logEntry)
if pos == segmentBytesLen {
break
}
if pos-oldPos == batchLen {
parseNextBatchFlag = true
}
}
segment.logEntries = logEntries
return nil
}