core/log/metric/reader.go (171 lines of code) (raw):

// Copyright 1999-2020 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package metric import ( "bufio" "io" "os" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/logging" "github.com/pkg/errors" ) const maxItemAmount = 100000 type MetricLogReader interface { ReadMetrics(nameList []string, fileNo uint32, startOffset uint64, maxLines uint32) ([]*base.MetricItem, error) ReadMetricsByEndTime(nameList []string, fileNo uint32, startOffset uint64, beginMs uint64, endMs uint64, resource string) ([]*base.MetricItem, error) } // Not thread-safe itself, but guarded by the outside MetricSearcher. type defaultMetricLogReader struct { } func (r *defaultMetricLogReader) ReadMetrics(nameList []string, fileNo uint32, startOffset uint64, maxLines uint32) ([]*base.MetricItem, error) { if len(nameList) == 0 { return make([]*base.MetricItem, 0), nil } // startOffset: the offset of the first file to read items, shouldContinue, err := r.readMetricsInOneFile(nameList[fileNo], startOffset, maxLines, 0, 0) if err != nil { return nil, err } if !shouldContinue { return items, nil } fileNo++ // Continue reading until the size or time does not satisfy the condition for { if int(fileNo) >= len(nameList) || len(items) >= int(maxLines) { // No files to read. break } arr, shouldContinue, err := r.readMetricsInOneFile(nameList[fileNo], 0, maxLines, getLatestSecond(items), uint32(len(items))) if err != nil { return nil, err } items = append(items, arr...) if !shouldContinue { break } fileNo++ } return items, nil } func (r *defaultMetricLogReader) ReadMetricsByEndTime(nameList []string, fileNo uint32, startOffset uint64, beginMs uint64, endMs uint64, resource string) ([]*base.MetricItem, error) { if len(nameList) == 0 { return make([]*base.MetricItem, 0), nil } // startOffset: the offset of the first file to read items, shouldContinue, err := r.readMetricsInOneFileByEndTime(nameList[fileNo], startOffset, beginMs, endMs, resource, 0) if err != nil { return nil, err } if !shouldContinue { return items, nil } fileNo++ // Continue reading until the size or time does not satisfy the condition for { if int(fileNo) >= len(nameList) { // No files to read. break } arr, shouldContinue, err := r.readMetricsInOneFileByEndTime(nameList[fileNo], 0, beginMs, endMs, resource, uint32(len(items))) if err != nil { return nil, err } items = append(items, arr...) if !shouldContinue { break } fileNo++ } return items, nil } func (r *defaultMetricLogReader) readMetricsInOneFile(filename string, offset uint64, maxLines uint32, lastSec uint64, prevSize uint32) ([]*base.MetricItem, bool, error) { file, err := openFileAndSeekTo(filename, offset) if err != nil { return nil, false, err } defer file.Close() bufReader := bufio.NewReaderSize(file, 8192) items := make([]*base.MetricItem, 0, 1024) for { line, err := readLine(bufReader) if err != nil { if err == io.EOF { shouldContinue := prevSize+uint32(len(items)) < maxLines return items, shouldContinue, nil } return nil, false, errors.Wrap(err, "error when reading lines from file") } item, err := base.MetricItemFromFatString(line) if err != nil { logging.Error(err, "Failed to convert MetricItem to string in defaultMetricLogReader.readMetricsInOneFile()") continue } tsSec := item.Timestamp / 1000 if prevSize+uint32(len(items)) >= maxLines && tsSec != lastSec { return items, false, nil } items = append(items, item) lastSec = tsSec } } func (r *defaultMetricLogReader) readMetricsInOneFileByEndTime(filename string, offset uint64, beginMs uint64, endMs uint64, resource string, prevSize uint32) ([]*base.MetricItem, bool, error) { beginSec := beginMs / 1000 endSec := endMs / 1000 file, err := openFileAndSeekTo(filename, offset) if err != nil { return nil, false, err } defer file.Close() bufReader := bufio.NewReaderSize(file, 8192) items := make([]*base.MetricItem, 0, 1024) for { line, err := readLine(bufReader) if err != nil { if err == io.EOF { return items, true, nil } return nil, false, errors.Wrap(err, "error when reading lines from file") } item, err := base.MetricItemFromFatString(line) if err != nil { logging.Error(err, "Invalid line of metric file in defaultMetricLogReader.readMetricsInOneFileByEndTime()", "fileLine", line) continue } tsSec := item.Timestamp / 1000 // currentSecond should in [beginSec, endSec] if tsSec < beginSec || tsSec > endSec { return items, false, nil } // empty resource name indicates "fetch all" if resource == "" || resource == item.Resource { items = append(items, item) } // Max items limit to avoid infinite reading if len(items)+int(prevSize) >= maxItemAmount { return items, false, nil } } } func readLine(bufReader *bufio.Reader) (string, error) { buf := make([]byte, 0, 64) for { line, ne, err := bufReader.ReadLine() if err != nil { return "", err } buf = append(buf, line...) if !ne { return string(buf), err } // buffer size < line size, so we need to read until the `ne` flag is false. } } func getLatestSecond(items []*base.MetricItem) uint64 { if items == nil || len(items) == 0 { return 0 } return items[len(items)-1].Timestamp / 1000 } func openFileAndSeekTo(filename string, offset uint64) (*os.File, error) { file, err := os.Open(filename) if err != nil { return nil, errors.Wrap(err, "failed to open file: "+filename) } // Set position to the offset recorded in the idx file _, err = file.Seek(int64(offset), io.SeekStart) if err != nil { _ = file.Close() return nil, errors.Wrapf(err, "failed to fseek to offset %d", offset) } return file, nil } func newDefaultMetricLogReader() MetricLogReader { return &defaultMetricLogReader{} }