core/log/metric/writer.go (259 lines of code) (raw):

// Copyright 1999-2020 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package metric import ( "bufio" "encoding/binary" "fmt" "os" "path/filepath" "strconv" "strings" "sync" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/core/config" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" "github.com/pkg/errors" ) type DefaultMetricLogWriter struct { baseDir string baseFilename string maxSingleSize uint64 maxFileAmount uint32 timezoneOffsetSec int64 latestOpSec int64 curMetricFile *os.File curMetricIdxFile *os.File metricOut *bufio.Writer idxOut *bufio.Writer mux *sync.RWMutex } func (d *DefaultMetricLogWriter) Write(ts uint64, items []*base.MetricItem) error { if len(items) == 0 { return nil } if ts <= 0 { return errors.New(fmt.Sprintf("%s: %d", "Invalid timestamp: ", ts)) } if d.curMetricFile == nil || d.curMetricIdxFile == nil { return errors.New("file handle not initialized") } // Update all metric items to the given timestamp. for _, item := range items { item.Timestamp = ts } d.mux.Lock() defer d.mux.Unlock() timeSec := int64(ts / 1000) if timeSec < d.latestOpSec { // ignore return nil } if timeSec > d.latestOpSec { pos, err := util.FilePosition(d.curMetricFile) if err != nil { return errors.Wrap(err, "cannot get current pos of the metric file") } if err = d.writeIndex(timeSec, pos); err != nil { return errors.Wrap(err, "cannot write metric idx file") } if d.isNewDay(d.latestOpSec, timeSec) { if err = d.rollToNextFile(ts); err != nil { return errors.Wrap(err, "failed to roll the metric log") } } } // Write and flush if err := d.writeItemsAndFlush(items); err != nil { return errors.Wrap(err, "failed to write and flush metric items") } if err := d.rollFileIfSizeExceeded(ts); err != nil { return errors.Wrap(err, "failed to pre-check the rolling condition of metric logs") } if timeSec > d.latestOpSec { // Update the latest timeSec. d.latestOpSec = timeSec } return nil } func (d *DefaultMetricLogWriter) Close() error { d.mux.Lock() defer d.mux.Unlock() if d.curMetricIdxFile != nil { d.curMetricIdxFile.Close() } if d.curMetricFile != nil { return d.curMetricFile.Close() } return nil } func (d *DefaultMetricLogWriter) writeItemsAndFlush(items []*base.MetricItem) error { for _, item := range items { s, err := item.ToFatString() if err != nil { logging.Warn("[writeItemsAndFlush] Failed to convert MetricItem to string", "resourceName", item.Resource, "err", err.Error()) continue } // Append the LF line separator. bs := []byte(s + "\n") _, err = d.metricOut.Write(bs) if err != nil { return nil } } return d.metricOut.Flush() } func (d *DefaultMetricLogWriter) rollFileIfSizeExceeded(time uint64) error { if d.curMetricFile == nil { return nil } stat, err := d.curMetricFile.Stat() if err != nil { return err } if uint64(stat.Size()) >= d.maxSingleSize { return d.rollToNextFile(time) } return nil } func (d *DefaultMetricLogWriter) rollToNextFile(time uint64) error { newFilename, err := d.nextFileNameOfTime(time) if err != nil { return err } return d.closeCurAndNewFile(newFilename) } func (d *DefaultMetricLogWriter) writeIndex(time, offset int64) error { out := d.idxOut if out == nil { return errors.New("index buffered writer not ready") } // Use BigEndian here to keep consistent with DataOutputStream in Java. err := binary.Write(out, binary.BigEndian, time) if err != nil { return err } err = binary.Write(out, binary.BigEndian, offset) if err != nil { return err } return out.Flush() } func (d *DefaultMetricLogWriter) removeDeprecatedFiles() error { files, err := listMetricFiles(d.baseDir, d.baseFilename) if err != nil || len(files) == 0 { return err } amountToRemove := len(files) - int(d.maxFileAmount) + 1 for i := 0; i < amountToRemove; i++ { filename := files[i] idxFilename := formMetricIdxFileName(filename) err = os.Remove(filename) if err != nil { logging.Error(err, "Failed to remove metric log file in DefaultMetricLogWriter.removeDeprecatedFiles()", "filename", filename) } else { logging.Info("[MetricWriter] Metric log file removed in DefaultMetricLogWriter.removeDeprecatedFiles()", "filename", filename) } err = os.Remove(idxFilename) if err != nil { logging.Error(err, "Failed to remove metric log file in DefaultMetricLogWriter.removeDeprecatedFiles()", "idxFilename", idxFilename) } else { logging.Info("[MetricWriter] Metric index file removed", "idxFilename", idxFilename) } } return err } func (d *DefaultMetricLogWriter) nextFileNameOfTime(time uint64) (string, error) { dateStr := util.FormatDate(time) filePattern := d.baseFilename + "." + dateStr list, err := listMetricFilesConditional(d.baseDir, filePattern, func(fn string, p string) bool { return strings.Contains(fn, p) }) if err != nil { return "", err } if len(list) == 0 { return filepath.Join(d.baseDir, filePattern), nil } last := list[len(list)-1] var n uint32 = 0 items := strings.Split(last, ".") if len(items) > 0 { v, err := strconv.ParseUint(items[len(items)-1], 10, 32) if err == nil { n = uint32(v) } } return filepath.Join(d.baseDir, fmt.Sprintf("%s.%d", filePattern, n+1)), nil } func (d *DefaultMetricLogWriter) closeCurAndNewFile(filename string) error { err := d.removeDeprecatedFiles() if err != nil { return err } if d.curMetricFile != nil { if err = d.curMetricFile.Close(); err != nil { logging.Error(err, "Failed to close metric log file in DefaultMetricLogWriter.closeCurAndNewFile()", "curMetricFile", d.curMetricFile.Name()) } } if d.curMetricIdxFile != nil { if err = d.curMetricIdxFile.Close(); err != nil { logging.Error(err, "Failed to close metric index file in DefaultMetricLogWriter.closeCurAndNewFile()", "curMetricIdxFile", d.curMetricIdxFile.Name()) } } // Create new metric log file, whether it exists or not. mf, err := os.Create(filename) if err != nil { return err } logging.Info("[MetricWriter] New metric log file created", "filename", filename) idxFile := formMetricIdxFileName(filename) mif, err := os.Create(idxFile) if err != nil { return err } logging.Info("[MetricWriter] New metric log index file created", "idxFile", idxFile) d.curMetricFile = mf d.metricOut = bufio.NewWriter(mf) d.curMetricIdxFile = mif d.idxOut = bufio.NewWriter(mif) return nil } func (d *DefaultMetricLogWriter) initialize() error { // Create the dir if not exists. err := util.CreateDirIfNotExists(d.baseDir) if err != nil { return err } if d.curMetricFile != nil { return nil } ts := util.CurrentTimeMillis() if err := d.rollToNextFile(ts); err != nil { return errors.Wrap(err, "failed to initialize metric log writer") } d.latestOpSec = int64(ts / 1000) return nil } func (d *DefaultMetricLogWriter) isNewDay(lastSec, sec int64) bool { prevDayTs := (lastSec + d.timezoneOffsetSec) / 86400 newDayTs := (sec + d.timezoneOffsetSec) / 86400 return newDayTs > prevDayTs } func NewDefaultMetricLogWriter(maxSize uint64, maxFileAmount uint32) (MetricLogWriter, error) { return NewDefaultMetricLogWriterOfApp(maxSize, maxFileAmount, config.AppName()) } func NewDefaultMetricLogWriterOfApp(maxSize uint64, maxFileAmount uint32, appName string) (MetricLogWriter, error) { if maxSize == 0 || maxFileAmount == 0 { return nil, errors.New("invalid maxSize or maxFileAmount") } _, offset := util.Now().Zone() logDir := config.LogBaseDir() if len(logDir) == 0 { logDir = config.GetDefaultLogDir() } baseDir := logDir baseFilename := FormMetricFileName(appName, config.LogUsePid()) writer := &DefaultMetricLogWriter{ maxSingleSize: maxSize, maxFileAmount: maxFileAmount, timezoneOffsetSec: int64(offset), latestOpSec: 0, baseDir: baseDir, baseFilename: baseFilename, mux: new(sync.RWMutex), } err := writer.initialize() return writer, err }