core/log/metric/searcher.go (171 lines of code) (raw):

// Copyright 1999-2020 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package metric import ( "encoding/binary" "io" "os" "sync" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" "github.com/pkg/errors" ) const offsetNotFound = -1 type DefaultMetricSearcher struct { reader MetricLogReader baseDir string baseFilename string cachedPos *filePosition mux *sync.Mutex } type filePosition struct { metricFilename string idxFilename string curOffsetInIdx uint64 curSecInIdx uint64 // TODO: cache the idx file handle here? } func (s *DefaultMetricSearcher) FindByTimeAndResource(beginTimeMs uint64, endTimeMs uint64, resource string) ([]*base.MetricItem, error) { return s.searchOffsetAndRead(beginTimeMs, func(filenames []string, fileNo uint32, offset uint64) (items []*base.MetricItem, err error) { return s.reader.ReadMetricsByEndTime(filenames, fileNo, offset, beginTimeMs, endTimeMs, resource) }) } func (s *DefaultMetricSearcher) FindFromTimeWithMaxLines(beginTimeMs uint64, maxLines uint32) ([]*base.MetricItem, error) { return s.searchOffsetAndRead(beginTimeMs, func(filenames []string, fileNo uint32, offset uint64) (items []*base.MetricItem, err error) { return s.reader.ReadMetrics(filenames, fileNo, offset, maxLines) }) } func (s *DefaultMetricSearcher) searchOffsetAndRead(beginTimeMs uint64, doRead func([]string, uint32, uint64) ([]*base.MetricItem, error)) ([]*base.MetricItem, error) { filenames, err := listMetricFiles(s.baseDir, s.baseFilename) if err != nil { return nil, err } // Try to position the latest file index and offset from the cache (fast-path). // If cache is not up-to-date, we'll read from the initial position (offset 0 of the first file). offsetStart, fileNo, err := s.getOffsetStartAndFileIdx(filenames, beginTimeMs) if err != nil { logging.Warn("[searchOffsetAndRead] Failed to getOffsetStartAndFileIdx", "beginTimeMs", beginTimeMs, "err", err.Error()) } fileAmount := uint32(len(filenames)) for i := fileNo; i < fileAmount; i++ { filename := filenames[i] // Retrieve the start offset that is valid for given condition. // If offset = -1, it indicates that current file (i) does not satisfy the condition. offset, err := s.findOffsetToStart(filename, beginTimeMs, offsetStart) if err != nil { logging.Warn("[searchOffsetAndRead] Failed to findOffsetToStart, will try next file", "beginTimeMs", beginTimeMs, "filename", filename, "offsetStart", offsetStart, "err", err) continue } if offset >= 0 { // Read metric items from the offset of current file (number i). return doRead(filenames, i, uint64(offset)) } } return make([]*base.MetricItem, 0), nil } func (s *DefaultMetricSearcher) getOffsetStartAndFileIdx(filenames []string, beginTimeMs uint64) (offsetInIdx uint64, i uint32, err error) { cacheOk, err := s.isPositionInTimeFor(beginTimeMs) if err != nil { return } if cacheOk { for j, v := range filenames { if v != s.cachedPos.metricFilename { i = uint32(j) offsetInIdx = s.cachedPos.curOffsetInIdx break } } } return } func (s *DefaultMetricSearcher) findOffsetToStart(filename string, beginTimeMs uint64, lastPos uint64) (int64, error) { s.cachedPos.idxFilename = "" s.cachedPos.metricFilename = "" idxFilename := formMetricIdxFileName(filename) if _, err := os.Stat(idxFilename); err != nil { return 0, err } beginSec := beginTimeMs / 1000 file, err := os.Open(idxFilename) if err != nil { return 0, errors.Wrap(err, "failed to open metric idx file: "+idxFilename) } defer file.Close() // Set position to the offset recorded in the idx file _, err = file.Seek(int64(lastPos), io.SeekStart) if err != nil { return 0, errors.Wrapf(err, "failed to fseek idx to offset %d", lastPos) } curPos, err := util.FilePosition(file) if err != nil { return 0, nil } s.cachedPos.curOffsetInIdx = uint64(curPos) var sec uint64 = 0 var offset int64 = 0 for { err = binary.Read(file, binary.BigEndian, &sec) if err != nil { if err == io.EOF { // EOF but offset hasn't been found yet, which indicates the expected position is not in current file return offsetNotFound, nil } return 0, err } if sec >= beginSec { break } err = binary.Read(file, binary.BigEndian, &offset) if err != nil { return 0, err } curPos, err := util.FilePosition(file) if err != nil { return 0, nil } s.cachedPos.curOffsetInIdx = uint64(curPos) } err = binary.Read(file, binary.BigEndian, &offset) if err != nil { return 0, err } // Cache the idx filename and position s.cachedPos.metricFilename = filename s.cachedPos.idxFilename = idxFilename s.cachedPos.curSecInIdx = sec return offset, nil } func (s *DefaultMetricSearcher) isPositionInTimeFor(beginTimeMs uint64) (bool, error) { if beginTimeMs/1000 < s.cachedPos.curSecInIdx { return false, nil } idxFilename := s.cachedPos.idxFilename if idxFilename == "" { return false, nil } if _, err := os.Stat(idxFilename); err != nil { return false, err } idxFile, err := openFileAndSeekTo(idxFilename, s.cachedPos.curOffsetInIdx) if err != nil { return false, err } defer idxFile.Close() var sec uint64 err = binary.Read(idxFile, binary.BigEndian, &sec) if err != nil { return false, err } return sec == s.cachedPos.curSecInIdx, nil } func NewDefaultMetricSearcher(baseDir, baseFilename string) (MetricSearcher, error) { if baseDir == "" { return nil, errors.New("empty base directory") } if baseFilename == "" { return nil, errors.New("empty base filename pattern") } if baseDir[len(baseDir)-1] != os.PathSeparator { baseDir = baseDir + string(os.PathSeparator) } reader := newDefaultMetricLogReader() return &DefaultMetricSearcher{ baseDir: baseDir, baseFilename: baseFilename, reader: reader, cachedPos: &filePosition{}, mux: new(sync.Mutex), }, nil }