func()

in core/log/metric/writer.go [53:103]


func (d *DefaultMetricLogWriter) Write(ts uint64, items []*base.MetricItem) error {
	if len(items) == 0 {
		return nil
	}
	if ts <= 0 {
		return errors.New(fmt.Sprintf("%s: %d", "Invalid timestamp: ", ts))
	}
	if d.curMetricFile == nil || d.curMetricIdxFile == nil {
		return errors.New("file handle not initialized")
	}
	// Update all metric items to the given timestamp.
	for _, item := range items {
		item.Timestamp = ts
	}

	d.mux.Lock()
	defer d.mux.Unlock()

	timeSec := int64(ts / 1000)
	if timeSec < d.latestOpSec {
		// ignore
		return nil
	}
	if timeSec > d.latestOpSec {
		pos, err := util.FilePosition(d.curMetricFile)
		if err != nil {
			return errors.Wrap(err, "cannot get current pos of the metric file")
		}
		if err = d.writeIndex(timeSec, pos); err != nil {
			return errors.Wrap(err, "cannot write metric idx file")
		}
		if d.isNewDay(d.latestOpSec, timeSec) {
			if err = d.rollToNextFile(ts); err != nil {
				return errors.Wrap(err, "failed to roll the metric log")
			}
		}
	}
	// Write and flush
	if err := d.writeItemsAndFlush(items); err != nil {
		return errors.Wrap(err, "failed to write and flush metric items")
	}
	if err := d.rollFileIfSizeExceeded(ts); err != nil {
		return errors.Wrap(err, "failed to pre-check the rolling condition of metric logs")
	}
	if timeSec > d.latestOpSec {
		// Update the latest timeSec.
		d.latestOpSec = timeSec
	}

	return nil
}