pkg/profiling/continuous/base/metrics.go (77 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package base
import (
"context"
"time"
"github.com/apache/skywalking-rover/pkg/process/api"
v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)
type MetricsAppender struct {
prefix string
timestamp int64
meters map[serviceInstanceMetadata][]*v3.MeterData // split by service instance
}
func NewMetricsAppender(prefix string) *MetricsAppender {
return &MetricsAppender{
prefix: prefix,
timestamp: time.Now().UnixMilli(),
meters: make(map[serviceInstanceMetadata][]*v3.MeterData),
}
}
func (m *MetricsAppender) AppendProcessSingleValue(name string, p api.ProcessInterface, labels map[string]string, value float64) {
transformLabels := make([]*v3.Label, 0)
for k, v := range labels {
transformLabels = append(transformLabels, &v3.Label{Name: k, Value: v})
}
transformLabels = append(transformLabels,
&v3.Label{Name: "process_name", Value: p.Entity().ProcessName},
&v3.Label{Name: "layer", Value: p.Entity().Layer})
metadata := serviceInstanceMetadata{
service: p.Entity().ServiceName,
instance: p.Entity().InstanceName,
}
m.meters[metadata] = append(m.meters[metadata], &v3.MeterData{
Service: p.Entity().ServiceName,
ServiceInstance: p.Entity().InstanceName,
Timestamp: m.timestamp,
Metric: &v3.MeterData_SingleValue{
SingleValue: &v3.MeterSingleValue{
Name: m.formatName(name),
Labels: transformLabels,
Value: value,
},
},
})
}
func (m *MetricsAppender) Flush(ctx context.Context, client v3.MeterReportServiceClient) error {
if len(m.meters) == 0 {
return nil
}
batch, err := client.CollectBatch(ctx)
if err != nil {
return err
}
for _, meters := range m.meters {
collection := &v3.MeterDataCollection{
MeterData: meters,
}
if e := batch.Send(collection); e != nil {
_ = m.closeSteam(batch)
return e
}
}
return m.closeSteam(batch)
}
func (m *MetricsAppender) closeSteam(batch v3.MeterReportService_CollectBatchClient) error {
_, err := batch.CloseAndRecv()
if err != nil {
return err
}
return nil
}
type serviceInstanceMetadata struct {
service string
instance string
}
func (m *MetricsAppender) formatName(name string) string {
return m.prefix + "_" + name
}