inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/metrics.go (367 lines of code) (raw):

// // Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package dataproxy import ( "errors" "sync" "github.com/prometheus/client_golang/prometheus" ) type metrics struct { errorCounter *prometheus.CounterVec errorCounters map[string]prometheus.Counter errorCounterLock sync.RWMutex retryCounter *prometheus.CounterVec retryCounters map[string]prometheus.Counter retryCounterLock sync.RWMutex timeoutCounter *prometheus.CounterVec timeoutCounters map[string]prometheus.Counter timeoutCounterLock sync.RWMutex messageCounter *prometheus.CounterVec messageCounters map[string]prometheus.Counter messageCounterLock sync.RWMutex updateConnCounter *prometheus.CounterVec updateConnCounters map[string]prometheus.Counter updateConnCounterLock sync.RWMutex pendingMessageGauge *prometheus.GaugeVec pendingMessageGauges map[string]prometheus.Gauge pendingMessageGaugeLock sync.RWMutex batchSizeHistogram *prometheus.HistogramVec batchSizeHistograms map[string]prometheus.Observer batchSizeHistogramLock sync.RWMutex batchTimeHistogram *prometheus.HistogramVec batchTimeHistograms map[string]prometheus.Observer batchTimeHistogramLock sync.RWMutex name string registry prometheus.Registerer } func newMetrics(name string, reg prometheus.Registerer) (*metrics, error) { if name == "" { return nil, errors.New("metrics name is not given") } registry := prometheus.DefaultRegisterer if reg != nil { registry = reg } m := &metrics{ errorCounters: make(map[string]prometheus.Counter), retryCounters: make(map[string]prometheus.Counter), timeoutCounters: make(map[string]prometheus.Counter), messageCounters: make(map[string]prometheus.Counter), updateConnCounters: make(map[string]prometheus.Counter), pendingMessageGauges: make(map[string]prometheus.Gauge), batchSizeHistograms: make(map[string]prometheus.Observer), batchTimeHistograms: make(map[string]prometheus.Observer), name: name, registry: registry, } err := m.init() if err != nil { return nil, err } return m, nil } func (m *metrics) init() error { m.errorCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "data_proxy_error_count", Help: "Counter of error events", }, []string{"name", "code"}) err := m.registry.Register(m.errorCounter) if err != nil { var are prometheus.AlreadyRegisteredError ok := errors.As(err, &are) if !ok { return err } m.errorCounter = are.ExistingCollector.(*prometheus.CounterVec) } m.retryCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "data_proxy_retry_count", Help: "Counter of retry batches", }, []string{"name", "worker"}) err = m.registry.Register(m.retryCounter) if err != nil { var are prometheus.AlreadyRegisteredError ok := errors.As(err, &are) if !ok { return err } m.retryCounter = are.ExistingCollector.(*prometheus.CounterVec) } m.timeoutCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "data_proxy_timeout_count", Help: "Counter of timeout batches", }, []string{"name", "worker"}) err = m.registry.Register(m.timeoutCounter) if err != nil { var are prometheus.AlreadyRegisteredError ok := errors.As(err, &are) if !ok { return err } m.timeoutCounter = are.ExistingCollector.(*prometheus.CounterVec) } m.messageCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "data_proxy_msg_count", Help: "Counter of message", }, []string{"name", "code"}) err = m.registry.Register(m.messageCounter) if err != nil { var are prometheus.AlreadyRegisteredError ok := errors.As(err, &are) if !ok { return err } m.messageCounter = are.ExistingCollector.(*prometheus.CounterVec) } m.updateConnCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "data_proxy_update_conn_count", Help: "Counter of update connection events", }, []string{"name", "code"}) err = m.registry.Register(m.updateConnCounter) if err != nil { var are prometheus.AlreadyRegisteredError ok := errors.As(err, &are) if !ok { return err } m.updateConnCounter = are.ExistingCollector.(*prometheus.CounterVec) } m.pendingMessageGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "data_proxy_pending_msg_gauge", Help: "Gauge of pending message", }, []string{"name", "worker"}) err = m.registry.Register(m.pendingMessageGauge) if err != nil { var are prometheus.AlreadyRegisteredError ok := errors.As(err, &are) if !ok { return err } m.pendingMessageGauge = are.ExistingCollector.(*prometheus.GaugeVec) } m.batchSizeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "data_proxy_batch_size", Help: "Histogram of batch size", Buckets: []float64{1024, 2 * 1024, 4 * 1024, 8 * 1024, 16 * 1024, 32 * 1024, 64 * 1024, 128 * 1024, 256 * 1024}, }, []string{"name", "code"}) err = m.registry.Register(m.batchSizeHistogram) if err != nil { var are prometheus.AlreadyRegisteredError ok := errors.As(err, &are) if !ok { return err } m.batchSizeHistogram = are.ExistingCollector.(*prometheus.HistogramVec) } m.batchTimeHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "data_proxy_batch_time", Help: "Histogram of batch time in milliseconds", Buckets: []float64{5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000}, }, []string{"name", "code"}) err = m.registry.Register(m.batchTimeHistogram) if err != nil { var are prometheus.AlreadyRegisteredError ok := errors.As(err, &are) if !ok { return err } m.batchTimeHistogram = are.ExistingCollector.(*prometheus.HistogramVec) } return nil } func (m *metrics) incError(code string) { m.errorCounterLock.RLock() c, ok := m.errorCounters[code] m.errorCounterLock.RUnlock() if ok { c.Add(float64(1)) return } m.errorCounterLock.Lock() defer m.errorCounterLock.Unlock() c, ok = m.errorCounters[code] if ok { c.Add(float64(1)) return } c, err := m.errorCounter.GetMetricWithLabelValues(m.name, code) if err != nil { return } c.Add(float64(1)) m.errorCounters[code] = c } func (m *metrics) incRetry(worker string) { m.retryCounterLock.RLock() c, ok := m.retryCounters[worker] m.retryCounterLock.RUnlock() if ok { c.Add(float64(1)) return } m.retryCounterLock.Lock() defer m.retryCounterLock.Unlock() c, ok = m.retryCounters[worker] if ok { c.Add(float64(1)) return } c, err := m.retryCounter.GetMetricWithLabelValues(m.name, worker) if err != nil { return } c.Add(float64(1)) m.retryCounters[worker] = c } func (m *metrics) incTimeout(worker string) { m.timeoutCounterLock.RLock() c, ok := m.timeoutCounters[worker] m.timeoutCounterLock.RUnlock() if ok { c.Add(float64(1)) return } m.timeoutCounterLock.Lock() defer m.timeoutCounterLock.Unlock() c, ok = m.timeoutCounters[worker] if ok { c.Add(float64(1)) return } c, err := m.timeoutCounter.GetMetricWithLabelValues(m.name, worker) if err != nil { return } c.Add(float64(1)) m.timeoutCounters[worker] = c } func (m *metrics) incMessage(code string) { m.messageCounterLock.RLock() c, ok := m.messageCounters[code] m.messageCounterLock.RUnlock() if ok { c.Add(float64(1)) return } m.messageCounterLock.Lock() defer m.messageCounterLock.Unlock() c, ok = m.messageCounters[code] if ok { c.Add(float64(1)) return } c, err := m.messageCounter.GetMetricWithLabelValues(m.name, code) if err != nil { return } c.Add(float64(1)) m.messageCounters[code] = c } func (m *metrics) incUpdateConn(code string) { m.updateConnCounterLock.RLock() c, ok := m.updateConnCounters[code] m.updateConnCounterLock.RUnlock() if ok { c.Add(float64(1)) return } m.updateConnCounterLock.Lock() defer m.updateConnCounterLock.Unlock() c, ok = m.updateConnCounters[code] if ok { c.Add(float64(1)) return } c, err := m.updateConnCounter.GetMetricWithLabelValues(m.name, code) if err != nil { return } c.Add(float64(1)) m.updateConnCounters[code] = c } func (m *metrics) incPending(worker string) { m.pendingMessageGaugeLock.RLock() c, ok := m.pendingMessageGauges[worker] m.pendingMessageGaugeLock.RUnlock() if ok { c.Add(float64(1)) return } m.pendingMessageGaugeLock.Lock() defer m.pendingMessageGaugeLock.Unlock() c, ok = m.pendingMessageGauges[worker] if ok { c.Add(float64(1)) return } c, err := m.pendingMessageGauge.GetMetricWithLabelValues(m.name, worker) if err != nil { return } c.Add(float64(1)) m.pendingMessageGauges[worker] = c } func (m *metrics) decPending(worker string) { m.pendingMessageGaugeLock.RLock() c, ok := m.pendingMessageGauges[worker] m.pendingMessageGaugeLock.RUnlock() if ok { c.Dec() return } m.pendingMessageGaugeLock.Lock() defer m.pendingMessageGaugeLock.Unlock() c, ok = m.pendingMessageGauges[worker] if ok { c.Dec() return } c, err := m.pendingMessageGauge.GetMetricWithLabelValues(m.name, worker) if err != nil { return } c.Dec() m.pendingMessageGauges[worker] = c } func (m *metrics) observeSize(code string, value int) { m.batchSizeHistogramLock.RLock() o, ok := m.batchSizeHistograms[code] m.batchSizeHistogramLock.RUnlock() if ok { o.Observe(float64(value)) return } m.batchSizeHistogramLock.Lock() defer m.batchSizeHistogramLock.Unlock() o, ok = m.batchSizeHistograms[code] if ok { o.Observe(float64(value)) return } o, err := m.batchSizeHistogram.GetMetricWithLabelValues(m.name, code) if err != nil { return } o.Observe(float64(value)) m.batchSizeHistograms[code] = o } func (m *metrics) observeTime(code string, value int64) { m.batchTimeHistogramLock.RLock() o, ok := m.batchTimeHistograms[code] m.batchTimeHistogramLock.RUnlock() if ok { o.Observe(float64(value)) return } m.batchTimeHistogramLock.Lock() defer m.batchTimeHistogramLock.Unlock() o, ok = m.batchTimeHistograms[code] if ok { o.Observe(float64(value)) return } o, err := m.batchTimeHistogram.GetMetricWithLabelValues(m.name, code) if err != nil { return } o.Observe(float64(value)) m.batchTimeHistograms[code] = o }