producer/monitor.go (72 lines of code) (raw):

package producer import ( "sync/atomic" "time" "github.com/aliyun/aliyun-log-go-sdk/internal" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" ) type ProducerMetrics struct { sendBatch internal.TimeHistogram // send logs retryCount atomic.Int32 createBatch atomic.Int32 onFail internal.TimeHistogram // onSuccess callback onSuccess internal.TimeHistogram // onFail callback waitMemory internal.TimeHistogram waitMemoryFailCount atomic.Int32 } type ProducerMonitor struct { metrics atomic.Value // *ProducerMetrics } func newProducerMonitor() *ProducerMonitor { m := &ProducerMonitor{} m.metrics.Store(&ProducerMetrics{}) return m } func (m *ProducerMonitor) recordSuccess(sendBegin time.Time, sendEnd time.Time) { metrics := m.metrics.Load().(*ProducerMetrics) metrics.sendBatch.AddSample(float64(sendEnd.Sub(sendBegin).Microseconds())) metrics.onSuccess.AddSample(float64(time.Since(sendEnd).Microseconds())) } func (m *ProducerMonitor) recordFailure(sendBegin time.Time, sendEnd time.Time) { metrics := m.metrics.Load().(*ProducerMetrics) metrics.sendBatch.AddSample(float64(sendEnd.Sub(sendBegin).Microseconds())) metrics.onFail.AddSample(float64(time.Since(sendEnd).Microseconds())) } func (m *ProducerMonitor) recordRetry(sendCost time.Duration) { metrics := m.metrics.Load().(*ProducerMetrics) metrics.sendBatch.AddSample(float64(sendCost.Microseconds())) metrics.retryCount.Add(1) } func (m *ProducerMonitor) recordWaitMemory(start time.Time) { metrics := m.metrics.Load().(*ProducerMetrics) metrics.waitMemory.AddSample(float64(time.Since(start).Microseconds())) } func (m *ProducerMonitor) incWaitMemoryFail() { metrics := m.metrics.Load().(*ProducerMetrics) metrics.waitMemoryFailCount.Add(1) } func (m *ProducerMonitor) incCreateBatch() { metrics := m.metrics.Load().(*ProducerMetrics) metrics.createBatch.Add(1) } func (m *ProducerMonitor) getAndResetMetrics() *ProducerMetrics { // we dont need cmp and swap, only one thread would call m.metrics.Store old := m.metrics.Load().(*ProducerMetrics) m.metrics.Store(&ProducerMetrics{}) return old } func (m *ProducerMonitor) reportThread(reportInterval time.Duration, logger log.Logger) { ticker := time.NewTicker(reportInterval) for range ticker.C { metrics := m.getAndResetMetrics() level.Info(logger).Log("msg", "report status", "sendBatch", metrics.sendBatch.String(), "retryCount", metrics.retryCount.Load(), "createBatch", metrics.createBatch.Load(), "onSuccess", metrics.onSuccess.String(), "onFail", metrics.onFail.String(), "waitMemory", metrics.waitMemory.String(), "waitMemoryFailCount", metrics.waitMemoryFailCount.Load(), ) } }