banyand/observability/service.go (73 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package observability
import (
"context"
"net/http"
"time"
"github.com/robfig/cron/v3"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var (
_ run.Service = (*metricService)(nil)
_ run.Config = (*metricService)(nil)
mux = http.NewServeMux()
)
// NewMetricService returns a metric service.
func NewMetricService() run.Service {
return &metricService{
closer: run.NewCloser(1),
}
}
type metricService struct {
l *logger.Logger
svr *http.Server
closer *run.Closer
scheduler *timestamp.Scheduler
listenAddr string
}
func (p *metricService) FlagSet() *run.FlagSet {
flagSet := run.NewFlagSet("observability")
flagSet.StringVar(&p.listenAddr, "observability-listener-addr", ":2121", "listen addr for observability")
return flagSet
}
func (p *metricService) Validate() error {
if p.listenAddr == "" {
return errNoAddr
}
return nil
}
func (p *metricService) Name() string {
return "metric-service"
}
func (p *metricService) Serve() run.StopNotify {
p.l = logger.GetLogger(p.Name())
clock, _ := timestamp.GetClock(context.TODO())
p.scheduler = timestamp.NewScheduler(p.l, clock)
err := p.scheduler.Register("metrics-collector", cron.Descriptor, "@every 15s", func(now time.Time, logger *logger.Logger) bool {
MetricsCollector.collect()
return true
})
if err != nil {
p.l.Fatal().Err(err).Msg("Failed to register metrics collector")
}
p.svr = &http.Server{
Addr: p.listenAddr,
ReadHeaderTimeout: 3 * time.Second,
Handler: mux,
}
go func() {
defer p.closer.Done()
p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start metric server")
_ = p.svr.ListenAndServe()
}()
return p.closer.CloseNotify()
}
func (p *metricService) GracefulStop() {
if p.scheduler != nil {
p.scheduler.Close()
}
if p.svr != nil {
_ = p.svr.Close()
}
p.closer.CloseThenWait()
}