pkg/wal/wal.go (873 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package wal (Write-ahead logging) is an independent component to ensure data reliability. package wal import ( "bytes" "container/list" "encoding/binary" "fmt" "math" "os" "path/filepath" "strconv" "strings" "sync" "time" "github.com/golang/snappy" "github.com/pkg/errors" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" ) const ( moduleName = "wal" segmentNamePrefix = "seg" segmentNameSuffix = ".wal" batchLength = 8 entryLength = 8 seriesIDLength = 2 seriesCountLength = 4 timestampVolumeLength = 8 timestampsBinaryLength = 2 valuesBinaryLength = 2 parseTimeStr = "2006-01-02 15:04:05" maxRetries = 3 maxSegmentID = uint64(math.MaxUint64) - 1 ) // DefaultOptions for Open(). var DefaultOptions = &Options{ FileSize: 67108864, // 64MB BufferSize: 65535, // 16KB BufferBatchInterval: 3 * time.Second, NoSync: false, } // Options for creating Write-ahead Logging. type Options struct { FileSize int BufferSize int BufferBatchInterval time.Duration FlushQueueSize int NoSync bool } // WAL denotes a Write-ahead logging. // Modules who want their data reliable could write data to an instance of WAL. // A WAL combines several segments, ingesting data on a single opened one. // Rotating the WAL will create a new segment, marking it as opened and persisting previous segments on the disk. type WAL interface { // Write a logging entity. // It will return immediately when the data is written in the buffer, // The callback function will be called when the entity is flushed on the persistent storage. Write(seriesID common.GlobalSeriesID, timestamp time.Time, data []byte, callback func(common.GlobalSeriesID, time.Time, []byte, error)) // Read specified segment by SegmentID. Read(segmentID SegmentID) (Segment, error) // ReadAllSegments reads all segments sorted by their creation time in ascending order. ReadAllSegments() ([]Segment, error) // Rotate closes the open segment and opens a new one, returning the closed segment details. Rotate() (Segment, error) // Delete the specified segment. Delete(segmentID SegmentID) error // Close all of segments and stop WAL work. Close() error } // SegmentID identities a segment in a WAL. type SegmentID uint64 // Segment allows reading underlying segments that hold WAl entities. type Segment interface { GetSegmentID() SegmentID GetLogEntries() []LogEntry } // LogEntry used for attain detail value of WAL entry. type LogEntry interface { GetSeriesID() common.GlobalSeriesID GetTimestamps() []time.Time GetValues() *list.List } // log implements the WAL interface. type log struct { writeCloser *run.ChannelCloser flushCloser *run.ChannelCloser chanGroupCloser *run.ChannelGroupCloser buffer buffer logger *logger.Logger bufferWriter *bufferWriter segmentMap map[SegmentID]*segment workSegment *segment writeChannel chan logRequest flushChannel chan buffer path string options Options rwMutex sync.RWMutex closerOnce sync.Once } type segment struct { file *os.File path string logEntries []LogEntry segmentID SegmentID } type logRequest struct { seriesID common.GlobalSeriesID timestamp time.Time callback func(common.GlobalSeriesID, time.Time, []byte, error) data []byte } type logEntry struct { timestamps []time.Time values *list.List seriesID common.GlobalSeriesID entryLength uint64 count uint32 } type buffer struct { timestampMap map[common.GlobalSeriesID][]time.Time valueMap map[common.GlobalSeriesID][]byte callbackMap map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error) count int } type bufferWriter struct { buf *bytes.Buffer seriesIDBuf *bytes.Buffer timestampsBuf *bytes.Buffer dataBuf []byte dataLen int seriesCount uint32 batchLen uint64 } // New creates a WAL instance in the specified path. func New(path string, options *Options) (WAL, error) { // Check configuration options. walOptions := DefaultOptions if options != nil { fileSize := options.FileSize if fileSize <= 0 { fileSize = DefaultOptions.FileSize } bufferSize := options.BufferSize if bufferSize <= 0 { bufferSize = DefaultOptions.BufferSize } bufferBatchInterval := options.BufferBatchInterval if bufferBatchInterval <= 0 { bufferBatchInterval = DefaultOptions.BufferBatchInterval } walOptions = &Options{ FileSize: fileSize, BufferSize: bufferSize, BufferBatchInterval: bufferBatchInterval, NoSync: options.NoSync, } } // Initial WAL path. path, err := filepath.Abs(path) if err != nil { return nil, errors.Wrap(err, "Can not get absolute path: "+path) } if err := os.MkdirAll(path, os.ModePerm); err != nil { return nil, err } writeCloser := run.NewChannelCloser() flushCloser := run.NewChannelCloser() chanGroupCloser := run.NewChannelGroupCloser(writeCloser, flushCloser) log := &log{ path: path, options: *walOptions, logger: logger.GetLogger(moduleName), writeChannel: make(chan logRequest), flushChannel: make(chan buffer, walOptions.FlushQueueSize), bufferWriter: newBufferWriter(), writeCloser: writeCloser, flushCloser: flushCloser, chanGroupCloser: chanGroupCloser, buffer: buffer{ timestampMap: make(map[common.GlobalSeriesID][]time.Time), valueMap: make(map[common.GlobalSeriesID][]byte), callbackMap: make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error)), count: 0, }, } if err := log.load(); err != nil { return nil, err } log.start() log.logger.Info().Str("path", path).Msg("WAL has be initialized") return log, nil } // Write a logging entity. // It will return immediately when the data is written in the buffer, // The callback function will be called when the entity is flushed on the persistent storage. func (log *log) Write(seriesID common.GlobalSeriesID, timestamp time.Time, data []byte, callback func(common.GlobalSeriesID, time.Time, []byte, error)) { if !log.writeCloser.AddSender() { return } defer log.writeCloser.SenderDone() log.writeChannel <- logRequest{ seriesID: seriesID, timestamp: timestamp, data: data, callback: callback, } } // Read specified segment by SegmentID. func (log *log) Read(segmentID SegmentID) (Segment, error) { log.rwMutex.RLock() defer log.rwMutex.RUnlock() segment := log.segmentMap[segmentID] return segment, nil } // ReadAllSegments reads all segments sorted by their creation time in ascending order. func (log *log) ReadAllSegments() ([]Segment, error) { log.rwMutex.RLock() defer log.rwMutex.RUnlock() segments := make([]Segment, 0) for _, segment := range log.segmentMap { segments = append(segments, segment) } return segments, nil } // Rotate closes the open segment and opens a new one, returning the closed segment details. func (log *log) Rotate() (Segment, error) { log.rwMutex.Lock() defer log.rwMutex.Unlock() newSegmentID := uint64(log.workSegment.segmentID) + 1 if newSegmentID > maxSegmentID { return nil, errors.New("Segment ID overflow uint64," + " please delete all WAL segment files and restart") } if err := log.workSegment.file.Close(); err != nil { return nil, errors.Wrap(err, "Close WAL segment error") } // Create new segment. oldWorkSegment := log.workSegment segment := &segment{ segmentID: SegmentID(newSegmentID), path: filepath.Join(log.path, segmentName(newSegmentID)), } if err := segment.openFile(true); err != nil { return nil, errors.Wrap(err, "Open WAL segment error") } log.workSegment = segment // Update segment information. log.segmentMap[log.workSegment.segmentID] = log.workSegment return oldWorkSegment, nil } // Delete the specified segment. func (log *log) Delete(segmentID SegmentID) error { log.rwMutex.Lock() defer log.rwMutex.Unlock() // Segment which will be deleted must be closed. if segmentID == log.workSegment.segmentID { return errors.New("Can not delete the segment which is working") } err := os.Remove(log.segmentMap[segmentID].path) if err != nil { return errors.Wrap(err, "Delete WAL segment error") } delete(log.segmentMap, segmentID) return nil } // Close all of segments and stop WAL work. func (log *log) Close() error { var globalErr error log.closerOnce.Do(func() { log.logger.Info().Msg("Closing WAL...") log.chanGroupCloser.CloseThenWait() if err := log.flushBuffer(log.buffer); err != nil { globalErr = multierr.Append(globalErr, err) } if err := log.workSegment.file.Close(); err != nil { globalErr = multierr.Append(globalErr, err) } log.logger.Info().Msg("Closed WAL") }) return globalErr } func (log *log) start() { var initialTasks sync.WaitGroup initialTasks.Add(2) go func() { if !log.writeCloser.AddReceiver() { panic("writeCloser already closed") } defer log.writeCloser.ReceiverDone() log.logger.Info().Msg("Start batch task...") initialTasks.Done() bufferVolume := 0 for { timer := time.NewTimer(log.options.BufferBatchInterval) select { case request, chOpen := <-log.writeChannel: if !chOpen { timer.Stop() log.logger.Info().Msg("Stop batch task when write-channel closed") return } log.buffer.write(request) if log.logger.Debug().Enabled() { log.logger.Debug().Msg("Write request to buffer. elements: " + strconv.Itoa(log.buffer.count)) } bufferVolume += request.seriesID.Volume() + timestampVolumeLength + len(request.data) if bufferVolume > log.options.BufferSize { log.triggerFlushing() bufferVolume = 0 } case <-timer.C: if bufferVolume == 0 { continue } log.triggerFlushing() bufferVolume = 0 case <-log.writeCloser.CloseNotify(): timer.Stop() log.logger.Info().Msg("Stop batch task when close notify") return } timer.Stop() } }() go func() { if !log.flushCloser.AddReceiver() { panic("flushCloser already closed") } defer log.flushCloser.ReceiverDone() log.logger.Info().Msg("Start flush task...") initialTasks.Done() for { select { case batch, chOpen := <-log.flushChannel: if !chOpen { log.logger.Info().Msg("Stop flush task when flush-channel closed") return } startTime := time.Now() var err error for i := 0; i < maxRetries; i++ { if err = log.flushBuffer(batch); err != nil { log.logger.Err(err).Msg("Flushing buffer failed. Retrying...") time.Sleep(time.Second) continue } break } if log.logger.Debug().Enabled() { log.logger.Debug().Msg("Flushed buffer to WAL file. elements: " + strconv.Itoa(batch.count) + ", cost: " + time.Since(startTime).String()) } batch.notifyRequests(err) case <-log.flushCloser.CloseNotify(): log.logger.Info().Msg("Stop flush task when close notify") return } } }() initialTasks.Wait() log.logger.Info().Msg("Started WAL") } func (log *log) triggerFlushing() { log.flushChannel <- log.buffer if log.logger.Debug().Enabled() { log.logger.Debug().Msg("Send buffer to flush-channel. elements: " + strconv.Itoa(log.buffer.count)) } log.newBuffer() } func (log *log) newBuffer() { log.buffer = buffer{ timestampMap: make(map[common.GlobalSeriesID][]time.Time), valueMap: make(map[common.GlobalSeriesID][]byte), callbackMap: make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error)), count: 0, } } func (log *log) flushBuffer(buffer buffer) error { if buffer.count == 0 { return nil } var err error if err = log.bufferWriter.Reset(); err != nil { return errors.Wrap(err, "Reset buffer writer error") } for seriesID, timestamps := range buffer.timestampMap { log.bufferWriter.ResetSeries() if err = log.bufferWriter.WriteSeriesID(seriesID); err != nil { return errors.Wrap(err, "Write seriesID error") } log.bufferWriter.WriteTimestamps(timestamps) log.bufferWriter.WriteData(buffer.valueMap[seriesID]) if err = log.bufferWriter.AddSeries(); err != nil { return errors.Wrap(err, "Add series error") } } return log.writeWorkSegment(log.bufferWriter.Bytes()) } func (log *log) writeWorkSegment(data []byte) error { log.rwMutex.RLock() defer log.rwMutex.RUnlock() // Write batch data to WAL segment file if _, err := log.workSegment.file.Write(data); err != nil { return errors.Wrap(err, "Write WAL segment file error, file: "+log.workSegment.path) } if !log.options.NoSync { if err := log.workSegment.file.Sync(); err != nil { log.logger.Warn().Msg("Sync WAL segment file to disk failed, file: " + log.workSegment.path) } } return nil } func (log *log) load() error { files, err := os.ReadDir(log.path) if err != nil { return errors.Wrap(err, "Can not read dir: "+log.path) } // Load all of WAL segments. var workSegmentID SegmentID log.segmentMap = make(map[SegmentID]*segment) for _, file := range files { name := file.Name() segmentID, parsePathErr := parseSegmentID(name) if parsePathErr != nil { return errors.Wrap(parsePathErr, "Parse file name error, name: "+name) } if segmentID > uint64(workSegmentID) { workSegmentID = SegmentID(segmentID) } segment := &segment{ segmentID: SegmentID(segmentID), path: filepath.Join(log.path, segmentName(segmentID)), } if err = segment.parseLogEntries(); err != nil { return errors.Wrap(err, "Fail to parse log entries") } log.segmentMap[SegmentID(segmentID)] = segment if log.logger.Debug().Enabled() { log.logger.Debug().Msg("Loaded segment file: " + segment.path) } } // If load first time. if len(log.segmentMap) == 0 { segmentID := SegmentID(1) segment := &segment{ segmentID: segmentID, path: filepath.Join(log.path, segmentName(uint64(segmentID))), } log.segmentMap[segmentID] = segment log.workSegment = segment } else { log.workSegment = log.segmentMap[workSegmentID] } if err = log.workSegment.openFile(false); err != nil { return errors.Wrap(err, "Open WAL segment error, file: "+log.workSegment.path) } return nil } func newBufferWriter() *bufferWriter { return &bufferWriter{ buf: bytes.NewBuffer([]byte{}), seriesIDBuf: bytes.NewBuffer([]byte{}), timestampsBuf: bytes.NewBuffer([]byte{}), dataBuf: make([]byte, 128), } } func (w *bufferWriter) Reset() error { w.ResetSeries() w.buf.Reset() w.batchLen = 0 // pre-placement padding err := w.writeBatchLength(0) return err } func (w *bufferWriter) ResetSeries() { w.seriesIDBuf.Reset() w.timestampsBuf.Reset() w.dataLen = 0 w.seriesCount = 0 } func (w *bufferWriter) AddSeries() error { seriesIDBytesLen := uint16(w.seriesIDBuf.Len()) timestampsBytesLen := uint16(w.timestampsBuf.Len()) entryLen := seriesIDLength + uint64(seriesIDBytesLen) + seriesCountLength + timestampsBinaryLength + uint64(timestampsBytesLen) + uint64(w.dataLen) var err error if err = w.writeEntryLength(entryLen); err != nil { return err } if err = w.writeSeriesIDLength(seriesIDBytesLen); err != nil { return err } if err = w.writeSeriesID(w.seriesIDBuf.Bytes()); err != nil { return err } if err = w.writeSeriesCount(w.seriesCount); err != nil { return err } if err = w.writeTimestampsLength(timestampsBytesLen); err != nil { return err } if err = w.writeTimestamps(w.timestampsBuf.Bytes()); err != nil { return err } if err = w.writeData(w.dataBuf[:w.dataLen]); err != nil { return err } w.batchLen += entryLen return nil } func (w *bufferWriter) Bytes() []byte { batchBytes := w.buf.Bytes() batchLen := uint64(len(batchBytes)) - batchLength return w.rewriteBatchLength(batchBytes, batchLen) } func (w *bufferWriter) WriteSeriesID(s common.GlobalSeriesID) error { if err := writeUint64(w.seriesIDBuf, uint64(s.SeriesID)); err != nil { return err } if _, err := w.seriesIDBuf.WriteString(s.Name); err != nil { return err } return nil } func (w *bufferWriter) WriteTimestamps(timestamps []time.Time) { timestampWriter := encoding.NewWriter() timestampEncoder := encoding.NewXOREncoder(timestampWriter) timestampWriter.Reset(w.timestampsBuf) for _, timestamp := range timestamps { timestampEncoder.Write(timeToUnixNano(timestamp)) } timestampWriter.Flush() w.seriesCount = uint32(len(timestamps)) } func (w *bufferWriter) WriteData(data []byte) { maxEncodedLen := snappy.MaxEncodedLen(len(data)) dataBufLen := len(w.dataBuf) if dataBufLen < maxEncodedLen { newCapacity := (dataBufLen * 2) - (dataBufLen / 2) if newCapacity < maxEncodedLen { newCapacity = maxEncodedLen } w.dataBuf = make([]byte, newCapacity) } snappyData := snappy.Encode(w.dataBuf, data) w.dataLen = len(snappyData) } func (w *bufferWriter) writeBatchLength(data uint64) error { return writeUint64(w.buf, data) } func (w *bufferWriter) rewriteBatchLength(b []byte, batchLen uint64) []byte { _ = b[7] // early bounds check to guarantee safety of writes below b[0] = byte(batchLen) b[1] = byte(batchLen >> 8) b[2] = byte(batchLen >> 16) b[3] = byte(batchLen >> 24) b[4] = byte(batchLen >> 32) b[5] = byte(batchLen >> 40) b[6] = byte(batchLen >> 48) b[7] = byte(batchLen >> 56) return b } func (w *bufferWriter) writeEntryLength(data uint64) error { return writeUint64(w.buf, data) } func (w *bufferWriter) writeSeriesIDLength(data uint16) error { return writeUint16(w.buf, data) } func (w *bufferWriter) writeSeriesID(data []byte) error { _, err := w.buf.Write(data) return err } func (w *bufferWriter) writeSeriesCount(data uint32) error { return writeUint32(w.buf, data) } func (w *bufferWriter) writeTimestampsLength(data uint16) error { return writeUint16(w.buf, data) } func (w *bufferWriter) writeTimestamps(data []byte) error { _, err := w.buf.Write(data) return err } func (w *bufferWriter) writeData(data []byte) error { _, err := w.buf.Write(data) return err } func (segment *segment) GetSegmentID() SegmentID { return segment.segmentID } func (segment *segment) GetLogEntries() []LogEntry { return segment.logEntries } func (segment *segment) openFile(overwrite bool) error { var err error if overwrite { segment.file, err = os.OpenFile(segment.path, os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm) } else { segment.file, err = os.OpenFile(segment.path, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModePerm) } return err } func (segment *segment) parseLogEntries() error { segmentBytes, err := os.ReadFile(segment.path) if err != nil { return errors.Wrap(err, "Read WAL segment failed, path: "+segment.path) } var logEntries []LogEntry var data []byte var batchLen uint64 var entryLen uint64 var seriesIDLen uint16 var seriesID common.GlobalSeriesID var seriesCount uint32 var timestampsBinaryLen uint16 var entryEndPosition uint64 var oldPos uint64 var pos uint64 parseNextBatchFlag := true segmentBytesLen := uint64(len(segmentBytes)) for { if parseNextBatchFlag { if segmentBytesLen <= batchLength { break } data = segmentBytes[pos : pos+batchLength] batchLen, err = segment.parseBatchLength(data) if err != nil { return errors.Wrap(err, "Parse batch length error") } if segmentBytesLen <= batchLen { break } pos += batchLength oldPos = pos parseNextBatchFlag = false } // Parse entryLength. data = segmentBytes[pos : pos+entryLength] entryLen, err = segment.parseEntryLength(data) if err != nil { return errors.Wrap(err, "Parse entry length error") } pos += entryLength // Mark entry end-position entryEndPosition = pos + entryLen if segmentBytesLen < entryEndPosition { break } // Parse seriesIDLength. data = segmentBytes[pos : pos+seriesIDLength] seriesIDLen, err = segment.parseSeriesIDLength(data) if err != nil { return errors.Wrap(err, "Parse seriesID length error") } pos += seriesIDLength // Parse seriesID. data = segmentBytes[pos : pos+uint64(seriesIDLen)] seriesID = segment.parseSeriesID(data) pos += uint64(seriesIDLen) // Parse series count. data = segmentBytes[pos : pos+seriesCountLength] seriesCount, err = segment.parseSeriesCountLength(data) if err != nil { return errors.Wrap(err, "Parse series count length error") } pos += seriesCountLength // Parse timestamps compression binary. data = segmentBytes[pos : pos+timestampsBinaryLength] timestampsBinaryLen, err = segment.parseTimestampsLength(data) if err != nil { return errors.Wrap(err, "Parse timestamps length error") } pos += timestampsBinaryLength data = segmentBytes[pos : pos+uint64(timestampsBinaryLen)] var timestamps []time.Time timestamps, err = segment.parseTimestamps(seriesCount, data) if err != nil { return errors.Wrap(err, "Parse timestamps compression binary error") } pos += uint64(timestampsBinaryLen) // Parse values compression binary. data = segmentBytes[pos:entryEndPosition] values, err := segment.parseValuesBinary(data) if err != nil { return errors.Wrap(err, "Parse values compression binary error") } if values.Len() != int(seriesCount) { return errors.New("values binary items not match series count. series count: " + strconv.Itoa(int(seriesCount)) + ", values binary items: " + strconv.Itoa(values.Len())) } pos = entryEndPosition logEntry := &logEntry{ entryLength: entryLen, seriesID: seriesID, count: seriesCount, timestamps: timestamps, values: values, } logEntries = append(logEntries, logEntry) if pos == segmentBytesLen { break } if pos-oldPos == batchLen { parseNextBatchFlag = true } } segment.logEntries = logEntries return nil } func (segment *segment) parseBatchLength(data []byte) (uint64, error) { var batchLen uint64 buf := bytes.NewBuffer(data) if err := binary.Read(buf, binary.LittleEndian, &batchLen); err != nil { return 0, err } return batchLen, nil } func (segment *segment) parseEntryLength(data []byte) (uint64, error) { var entryLen uint64 buf := bytes.NewBuffer(data) if err := binary.Read(buf, binary.LittleEndian, &entryLen); err != nil { return 0, err } return entryLen, nil } func (segment *segment) parseSeriesIDLength(data []byte) (uint16, error) { var seriesIDLen uint16 buf := bytes.NewBuffer(data) if err := binary.Read(buf, binary.LittleEndian, &seriesIDLen); err != nil { return 0, err } return seriesIDLen, nil } func (segment *segment) parseSeriesID(data []byte) common.GlobalSeriesID { return common.GlobalSeriesID{ SeriesID: common.SeriesID(bytesToUint64(data[:8])), Name: string(data[8:]), } } func (segment *segment) parseSeriesCountLength(data []byte) (uint32, error) { var seriesCount uint32 buf := bytes.NewBuffer(data) if err := binary.Read(buf, binary.LittleEndian, &seriesCount); err != nil { return 0, err } return seriesCount, nil } func (segment *segment) parseTimestampsLength(data []byte) (uint16, error) { var timestampsLen uint16 buf := bytes.NewBuffer(data) if err := binary.Read(buf, binary.LittleEndian, &timestampsLen); err != nil { return 0, err } return timestampsLen, nil } func (segment *segment) parseTimestamps(seriesCount uint32, data []byte) ([]time.Time, error) { timestampReader := encoding.NewReader(bytes.NewReader(data)) timestampDecoder := encoding.NewXORDecoder(timestampReader) var timestamps []time.Time for i := 0; i < int(seriesCount); i++ { if !timestampDecoder.Next() { return nil, errors.New("Timestamps length not match series count") } timestamps = append(timestamps, unixNanoToTime(timestampDecoder.Value())) } return timestamps, nil } func (segment *segment) parseValuesBinary(data []byte) (*list.List, error) { var err error if data, err = snappy.Decode(nil, data); err != nil { return nil, errors.Wrap(err, "Decode values compression binary error") } values := list.New() position := 0 for { nextPosition, value := readValuesBinary(data, position, valuesBinaryLength) if value == nil { break } values.PushBack(value) position = nextPosition } return values, nil } func (logEntry *logEntry) GetSeriesID() common.GlobalSeriesID { return logEntry.seriesID } func (logEntry *logEntry) GetTimestamps() []time.Time { return logEntry.timestamps } func (logEntry *logEntry) GetValues() *list.List { return logEntry.values } func (buffer *buffer) write(request logRequest) { seriesID := request.seriesID buffer.timestampMap[seriesID] = append(buffer.timestampMap[seriesID], request.timestamp) // Value item: binary-length(2-bytes) + binary data(n-bytes) binaryLen := uint16(len(request.data)) buffer.valueMap[seriesID] = append(buffer.valueMap[seriesID], byte(binaryLen), byte(binaryLen>>8)) buffer.valueMap[seriesID] = append(buffer.valueMap[seriesID], request.data...) buffer.callbackMap[seriesID] = append(buffer.callbackMap[seriesID], request.callback) buffer.count++ } func (buffer *buffer) notifyRequests(err error) { var timestamps []time.Time var values []byte var valueItem []byte var valuePos int for seriesID, callbacks := range buffer.callbackMap { timestamps = buffer.timestampMap[seriesID] values = buffer.valueMap[seriesID] valuePos = 0 for index, callback := range callbacks { valuePos, valueItem = readValuesBinary(values, valuePos, valuesBinaryLength) buffer.runningCallback(func() { callback(seriesID, timestamps[index], valueItem, err) }) } } } func (buffer *buffer) runningCallback(callback func()) { defer func() { _ = recover() }() callback() } func segmentName(segmentID uint64) string { return fmt.Sprintf("%v%016x%v", segmentNamePrefix, segmentID, segmentNameSuffix) } // Parse segment ID. segmentName example: seg0000000000000001.wal. func parseSegmentID(segmentName string) (uint64, error) { _ = segmentName[22:] // early bounds check to guarantee safety of reads below if !strings.HasPrefix(segmentName, segmentNamePrefix) { return 0, errors.New("Invalid segment name: " + segmentName) } if !strings.HasSuffix(segmentName, segmentNameSuffix) { return 0, errors.New("Invalid segment name: " + segmentName) } return strconv.ParseUint(segmentName[3:19], 10, 64) } func readValuesBinary(raw []byte, position int, offsetLen int) (int, []byte) { if position == len(raw) { return position, nil } data := raw[position : position+offsetLen] binaryLen := bytesToUint16(data) position += offsetLen data = raw[position : position+int(binaryLen)] position += int(binaryLen) return position, data } func writeUint16(buffer *bytes.Buffer, data uint16) error { var err error if err = buffer.WriteByte(byte(data)); err != nil { return err } if err = buffer.WriteByte(byte(data >> 8)); err != nil { return err } return err } func writeUint32(buffer *bytes.Buffer, data uint32) error { var err error if err = buffer.WriteByte(byte(data)); err != nil { return err } if err = buffer.WriteByte(byte(data >> 8)); err != nil { return err } if err = buffer.WriteByte(byte(data >> 16)); err != nil { return err } if err = buffer.WriteByte(byte(data >> 24)); err != nil { return err } return err } func writeUint64(buffer *bytes.Buffer, data uint64) error { var err error if err = buffer.WriteByte(byte(data)); err != nil { return err } if err = buffer.WriteByte(byte(data >> 8)); err != nil { return err } if err = buffer.WriteByte(byte(data >> 16)); err != nil { return err } if err = buffer.WriteByte(byte(data >> 24)); err != nil { return err } if err = buffer.WriteByte(byte(data >> 32)); err != nil { return err } if err = buffer.WriteByte(byte(data >> 40)); err != nil { return err } if err = buffer.WriteByte(byte(data >> 48)); err != nil { return err } if err = buffer.WriteByte(byte(data >> 56)); err != nil { return err } return err } func bytesToUint16(buf []byte) uint16 { return binary.LittleEndian.Uint16(buf) } func bytesToUint64(buf []byte) uint64 { return binary.LittleEndian.Uint64(buf) } func timeToUnixNano(time time.Time) uint64 { return uint64(time.UnixNano()) } func unixNanoToTime(unixNano uint64) time.Time { return time.Unix(0, int64(unixNano)) }