consumer/shard_monitor.go (66 lines of code) (raw):
package consumerLibrary
import (
"time"
"go.uber.org/atomic"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/aliyun/aliyun-log-go-sdk/internal"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
)
type MonitorMetrics struct {
fetchReqFailedCount atomic.Int64
logRawSize atomic.Int64
fetchLogHistogram internal.TimeHistogram // in us
processFailedCount atomic.Int64
processHistogram internal.TimeHistogram // in us
}
type ShardMonitor struct {
shard int
reportInterval time.Duration
lastReportTime time.Time
metrics atomic.Value // *MonitorMetrics
}
func newShardMonitor(shard int, reportInterval time.Duration) *ShardMonitor {
monitor := &ShardMonitor{
shard: shard,
reportInterval: reportInterval,
lastReportTime: time.Now(),
}
monitor.metrics.Store(&MonitorMetrics{})
return monitor
}
func (m *ShardMonitor) RecordFetchRequest(plm *sls.PullLogMeta, err error, start time.Time) {
metrics := m.metrics.Load().(*MonitorMetrics)
if err != nil {
metrics.fetchReqFailedCount.Inc()
} else {
metrics.logRawSize.Add(int64(plm.RawSize))
}
metrics.fetchLogHistogram.AddSample(float64(time.Since(start).Microseconds()))
}
func (m *ShardMonitor) RecordProcess(err error, start time.Time) {
metrics := m.metrics.Load().(*MonitorMetrics)
if err != nil {
metrics.processFailedCount.Inc()
}
metrics.processHistogram.AddSample(float64(time.Since(start).Microseconds()))
}
func (m *ShardMonitor) getAndResetMetrics() *MonitorMetrics {
// we dont need cmp and swap, only one thread would call m.metrics.Store
old := m.metrics.Load().(*MonitorMetrics)
m.metrics.Store(&MonitorMetrics{})
return old
}
func (m *ShardMonitor) shouldReport() bool {
return time.Since(m.lastReportTime) >= m.reportInterval
}
func (m *ShardMonitor) reportByLogger(logger log.Logger) {
m.lastReportTime = time.Now()
metrics := m.getAndResetMetrics()
level.Info(logger).Log("msg", "report status",
"fetchFailed", metrics.fetchReqFailedCount.Load(),
"logRawSize", metrics.logRawSize.Load(),
"processFailed", metrics.processFailedCount.Load(),
"fetch", metrics.fetchLogHistogram.String(),
"process", metrics.processHistogram.String(),
)
}