in core/log/metric/writer.go [53:103]
func (d *DefaultMetricLogWriter) Write(ts uint64, items []*base.MetricItem) error {
if len(items) == 0 {
return nil
}
if ts <= 0 {
return errors.New(fmt.Sprintf("%s: %d", "Invalid timestamp: ", ts))
}
if d.curMetricFile == nil || d.curMetricIdxFile == nil {
return errors.New("file handle not initialized")
}
// Update all metric items to the given timestamp.
for _, item := range items {
item.Timestamp = ts
}
d.mux.Lock()
defer d.mux.Unlock()
timeSec := int64(ts / 1000)
if timeSec < d.latestOpSec {
// ignore
return nil
}
if timeSec > d.latestOpSec {
pos, err := util.FilePosition(d.curMetricFile)
if err != nil {
return errors.Wrap(err, "cannot get current pos of the metric file")
}
if err = d.writeIndex(timeSec, pos); err != nil {
return errors.Wrap(err, "cannot write metric idx file")
}
if d.isNewDay(d.latestOpSec, timeSec) {
if err = d.rollToNextFile(ts); err != nil {
return errors.Wrap(err, "failed to roll the metric log")
}
}
}
// Write and flush
if err := d.writeItemsAndFlush(items); err != nil {
return errors.Wrap(err, "failed to write and flush metric items")
}
if err := d.rollFileIfSizeExceeded(ts); err != nil {
return errors.Wrap(err, "failed to pre-check the rolling condition of metric logs")
}
if timeSec > d.latestOpSec {
// Update the latest timeSec.
d.latestOpSec = timeSec
}
return nil
}