internal/satellite/telemetry/metricservice/server.go (137 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package metricservice
import (
"context"
"fmt"
"sync"
"time"
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/sharing"
"github.com/apache/skywalking-satellite/internal/satellite/telemetry"
client "github.com/apache/skywalking-satellite/plugins/client/api"
"github.com/apache/skywalking-satellite/plugins/client/grpc/lb"
server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
"google.golang.org/grpc"
v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)
func init() {
telemetry.Register("metrics_service", &Server{}, false)
}
type Server struct {
telemetry.MetricsServiceConfig
service string
instance string
metrics map[string]Metric
lock sync.Mutex
prevServerAddr string
meterClient v3.MeterReportServiceClient
reportStream v3.MeterReportService_CollectBatchClient
ctx context.Context
cancel context.CancelFunc
}
func (s *Server) Start(config *telemetry.Config) error {
s.metrics = make(map[string]Metric)
s.MetricsServiceConfig = config.MetricsService
s.service = config.Service
s.instance = config.Instance
s.ctx, s.cancel = context.WithCancel(context.Background())
return nil
}
func (s *Server) AfterSharingStart() error {
plugin := sharing.Manager[s.MetricsServiceConfig.ClientName]
if plugin == nil {
return fmt.Errorf("could not fould client %s", s.MetricsServiceConfig.ClientName)
}
grpcClient, ok := plugin.(client.Client)
if !ok {
return fmt.Errorf("the client is not grpc client")
}
conn := grpcClient.GetConnectedClient().(*grpc.ClientConn)
s.meterClient = v3.NewMeterReportServiceClient(conn)
go func() {
ticker := time.NewTicker(time.Duration(s.Interval) * time.Second)
for {
select {
case <-ticker.C:
if err := s.sendMetrics(); err != nil {
log.Logger.Warnf("send satellite metrics failure: %v", err)
}
case <-context.Background().Done():
return
case <-s.ctx.Done():
s.cancel()
return
}
}
}()
return nil
}
func (s *Server) sendMetrics() error {
if s.reportStream == nil {
if err := s.openBatchStream(); err != nil {
return err
}
}
appender := &MetricsAppender{
time: time.Now().UnixNano() / int64(time.Millisecond),
metrics: make([]*v3.MeterData, 0),
prefix: s.MetricPrefix,
}
for _, metric := range s.metrics {
metric.WriteMetric(appender)
}
if len(appender.metrics) == 0 {
return nil
}
appender.metrics[0].Service = s.service
appender.metrics[0].ServiceInstance = s.instance
if err := s.reportStream.Send(&v3.MeterDataCollection{MeterData: appender.metrics}); err != nil {
if openErr := s.openBatchStream(); openErr != nil {
log.Logger.Warnf("detect send message error and reopen stream failure: %v", openErr)
}
}
return nil
}
func (s *Server) openBatchStream() error {
if s.reportStream != nil {
_, err := s.reportStream.CloseAndRecv()
if err != nil {
log.Logger.Warnf("close satellite meter protocol error: %v", err)
}
}
if meterStream, err := s.meterClient.CollectBatch(lb.WithLoadBalanceConfig(context.Background(), "metricsService", s.prevServerAddr)); err == nil {
s.reportStream = meterStream
s.prevServerAddr = server_grpc.GetPeerAddressFromStreamContext(meterStream.Context())
} else {
return fmt.Errorf("could not start metrics service stream: %v", err)
}
return nil
}
func (s *Server) Close() error {
s.cancel()
if s.reportStream != nil {
if _, err := s.reportStream.CloseAndRecv(); err != nil {
log.Logger.Warnf("error close the meter stream in satellite metrics service: %v", err)
}
}
return nil
}
func (s *Server) Register(name string, metric Metric) {
s.metrics[name] = metric
}
type MetricsAppender struct {
time int64
metrics []*v3.MeterData
prefix string
}
func (a *MetricsAppender) appendSingleValue(name string, labels []*v3.Label, val float64) {
a.metrics = append(a.metrics, &v3.MeterData{
Timestamp: a.time,
Metric: &v3.MeterData_SingleValue{
SingleValue: &v3.MeterSingleValue{
Name: a.prefix + name,
Labels: labels,
Value: val,
},
},
})
}