plugin/metrics/prometheus/metrics.go (116 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
import (
"fmt"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
"sync"
)
func init() {
plugin.Register("prometheus", getDefaultMetrics())
}
func getDefaultMetrics() *Metrics {
return &Metrics{
counters: make(map[string]prometheus.Collector),
histograms: make(map[string]prometheus.Collector)}
}
type Metrics struct {
counters map[string]prometheus.Collector
histograms map[string]prometheus.Collector
lock sync.Mutex
once sync.Once
config *Config
}
const nameSpace = "eventmesh"
const subSystem = "runtime"
const (
histogram = iota
counter
)
func (p *Metrics) Type() string {
return metrics.PluginType
}
func (p *Metrics) Setup(name string, dec plugin.Decoder) error {
config := NewDefaultConfig()
err := dec.Decode(config)
if err != nil {
return err
}
p.Init(config)
return nil
}
// Init try to init metrics, include exposing http endpoint
func (p *Metrics) Init(config *Config) {
p.once.Do(func() {
p.config = config
p.exposeEndpoint()
})
}
// exposeEndpoint expose http endpoint
func (p *Metrics) exposeEndpoint() {
go func() {
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(fmt.Sprintf(":%s", p.config.EndpointPort), nil)
}()
}
// loadCollector load collector by name and collectorType
func (p *Metrics) loadCollector(name string, collectorType int) prometheus.Collector {
if collector := p.getCollectorByNameAndType(name, collectorType); collector != nil {
return collector
}
return p.registerNewCollector(name, collectorType)
}
func (p *Metrics) getCollectorByNameAndType(name string, collectorType int) prometheus.Collector {
switch collectorType {
case counter:
return p.counters[name]
case histogram:
return p.histograms[name]
default:
panic("prometheus metrics plugin get collector error, illegal collector type")
}
}
// registerNewCollector create and register new collector of collectorType
func (p *Metrics) registerNewCollector(name string, collectorType int) prometheus.Collector {
p.lock.Lock()
defer p.lock.Unlock()
if collector := p.getCollectorByNameAndType(name, collectorType); collector != nil {
return collector
}
var collector prometheus.Collector
switch collectorType {
case counter:
collector = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: nameSpace,
Subsystem: subSystem,
Name: name,
}, []string{"label"},
)
p.counters[name] = collector
case histogram:
collector = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: nameSpace,
Subsystem: subSystem,
Name: name,
Buckets: prometheus.ExponentialBuckets(1, 2, 13),
}, []string{"label"},
)
p.histograms[name] = collector
default:
panic("prometheus metrics plugin register collector error, illegal collector type")
}
prometheus.MustRegister(collector)
return collector
}
func (p *Metrics) IncCount(name string, label string) {
collector := p.loadCollector(name, counter).(*prometheus.CounterVec)
collector.With(prometheus.Labels{"label": label}).Inc()
}
func (p *Metrics) AddCount(name string, label string, val float64) {
collector := p.loadCollector(name, counter).(*prometheus.CounterVec)
collector.With(prometheus.Labels{"label": label}).Add(val)
}
func (p *Metrics) RecordLatency(name string, label string, latency float64) {
collector := p.loadCollector(name, histogram).(*prometheus.HistogramVec)
collector.With(prometheus.Labels{"label": label}).Observe(latency)
}