in common/metric.go [114:181]
func (metric *ReplicationMetric) startup() {
metric.init()
go func() {
tick := 0
// items that need be reset
resetItems := []*MetricDelta{&metric.OplogSuccess}
for range time.NewTicker(1 * time.Second).C {
if metric.isClosed {
break
}
tick++
metric.resetEverySecond(resetItems)
if tick%FrequentInSeconds != 0 {
continue
}
ckpt := atomic.LoadUint64(&metric.CheckpointTimes)
lsnCkpt := atomic.LoadInt64(&metric.LSNCheckpoint)
restrans := atomic.LoadUint64(&metric.Retransmission)
tps := atomic.LoadUint64(&metric.OplogSuccess.Delta)
success := atomic.LoadUint64(&metric.OplogSuccess.Value)
verbose := "[name=%s, stage=%s, get=%d"
if metric.SUBSCRIBE&METRIC_FILTER != 0 {
verbose += fmt.Sprintf(", filter=%d", atomic.LoadUint64(&metric.OplogFilter.Value))
}
if metric.SUBSCRIBE&METRIC_WORKER != 0 {
verbose += fmt.Sprintf(", worker_consume=%d, worker_apply=%d, worker_failed_times=%d",
atomic.LoadUint64(&metric.OplogConsume.Value), // worker fetch
atomic.LoadUint64(&metric.OplogApply.Value), // worker send
atomic.LoadUint64(&metric.OplogFail.Value)) // worker send fail
}
if metric.SUBSCRIBE&METRIC_SUCCESS != 0 {
verbose += fmt.Sprintf(", write_success=%d", success)
}
if metric.SUBSCRIBE&METRIC_TPS != 0 {
verbose += fmt.Sprintf(", tps=%d", tps)
}
if metric.SUBSCRIBE&METRIC_CKPT_TIMES != 0 {
verbose += fmt.Sprintf(", ckpt_times=%d", ckpt)
}
if metric.SUBSCRIBE&METRIC_RETRANSIMISSION != 0 {
verbose += fmt.Sprintf(", retransimit_times=%d", restrans)
}
if metric.SUBSCRIBE&METRIC_TUNNEL_TRAFFIC != 0 {
verbose += fmt.Sprintf(", tunnel_traffic=%s", metric.getTunnelTraffic())
}
if metric.SUBSCRIBE&METRIC_LSN != 0 {
verbose += fmt.Sprintf(", lsn_ckpt={%v, %s}",
ExtractTimestampForLog(lsnCkpt),
TimestampToString(ExtractMongoTimestamp(lsnCkpt)))
verbose += fmt.Sprintf(", lsn_ack={%v, %s}]",
ExtractTimestampForLog(atomic.LoadInt64(&metric.LSNAck)),
TimestampToString(ExtractMongoTimestamp(atomic.LoadInt64(&metric.LSNAck))))
}
if metric.SUBSCRIBE&METRIC_FULLSYNC_WRITE != 0 {
verbose += fmt.Sprintf(", fail=%d", atomic.LoadUint64(&metric.OplogWriteFail.Value))
}
verbose += "]"
LOG.Info(verbose, metric.NAME, metric.STAGE,
atomic.LoadUint64(&metric.OplogGet.Value))
}
LOG.Info("metric[%v] exit", metric)
}()
}