plugins/core/reporter/grpc/grpc.go (449 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package grpc
import (
"context"
"io"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
configuration "skywalking.apache.org/repo/goapi/collect/agent/configuration/v3"
agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
logv3 "skywalking.apache.org/repo/goapi/collect/logging/v3"
managementv3 "skywalking.apache.org/repo/goapi/collect/management/v3"
"github.com/apache/skywalking-go/plugins/core/operator"
"github.com/apache/skywalking-go/plugins/core/reporter"
)
const (
maxSendQueueSize int32 = 30000
defaultCheckInterval = 20 * time.Second
defaultCDSInterval = 20 * time.Second
)
// NewGRPCReporter create a new reporter to send data to gRPC oap server. Only one backend address is allowed.
func NewGRPCReporter(logger operator.LogOperator, serverAddr string, opts ...ReporterOption) (reporter.Reporter, error) {
r := &gRPCReporter{
logger: logger,
tracingSendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize),
metricsSendCh: make(chan []*agentv3.MeterData, maxSendQueueSize),
logSendCh: make(chan *logv3.LogData, maxSendQueueSize),
checkInterval: defaultCheckInterval,
cdsInterval: defaultCDSInterval, // cds default on
connectionStatus: reporter.ConnectionStatusConnected,
}
for _, o := range opts {
o(r)
}
var credsDialOption grpc.DialOption
if r.creds != nil {
// use tls
credsDialOption = grpc.WithTransportCredentials(r.creds)
} else {
credsDialOption = grpc.WithTransportCredentials(insecure.NewCredentials())
}
conn, err := grpc.Dial(serverAddr, credsDialOption, grpc.WithConnectParams(grpc.ConnectParams{
// update the max backoff delay interval
Backoff: backoff.Config{
BaseDelay: 1.0 * time.Second,
Multiplier: 1.6,
Jitter: 0.2,
MaxDelay: r.checkInterval,
},
}))
if err != nil {
return nil, err
}
r.conn = conn
r.traceClient = agentv3.NewTraceSegmentReportServiceClient(r.conn)
r.metricsClient = agentv3.NewMeterReportServiceClient(r.conn)
r.logClient = logv3.NewLogReportServiceClient(r.conn)
r.managementClient = managementv3.NewManagementServiceClient(r.conn)
if r.cdsInterval > 0 {
r.cdsClient = configuration.NewConfigurationDiscoveryServiceClient(r.conn)
r.cdsService = reporter.NewConfigDiscoveryService()
}
return r, nil
}
type gRPCReporter struct {
entity *reporter.Entity
logger operator.LogOperator
tracingSendCh chan *agentv3.SegmentObject
metricsSendCh chan []*agentv3.MeterData
logSendCh chan *logv3.LogData
conn *grpc.ClientConn
traceClient agentv3.TraceSegmentReportServiceClient
metricsClient agentv3.MeterReportServiceClient
logClient logv3.LogReportServiceClient
managementClient managementv3.ManagementServiceClient
checkInterval time.Duration
cdsInterval time.Duration
cdsService *reporter.ConfigDiscoveryService
cdsClient configuration.ConfigurationDiscoveryServiceClient
md metadata.MD
creds credentials.TransportCredentials
// bootFlag is set if Boot be executed
bootFlag bool
connectionStatus reporter.ConnectionStatus
}
func (r *gRPCReporter) Boot(entity *reporter.Entity, cdsWatchers []reporter.AgentConfigChangeWatcher) {
r.entity = entity
r.initSendPipeline()
r.check()
r.initCDS(cdsWatchers)
r.bootFlag = true
}
func (r *gRPCReporter) ConnectionStatus() reporter.ConnectionStatus {
return r.connectionStatus
}
func (r *gRPCReporter) SendTracing(spans []reporter.ReportedSpan) {
spanSize := len(spans)
if spanSize < 1 {
return
}
rootSpan := spans[spanSize-1]
rootCtx := rootSpan.Context()
segmentObject := &agentv3.SegmentObject{
TraceId: rootCtx.GetTraceID(),
TraceSegmentId: rootCtx.GetSegmentID(),
Spans: make([]*agentv3.SpanObject, spanSize),
Service: r.entity.ServiceName,
ServiceInstance: r.entity.ServiceInstanceName,
}
for i, s := range spans {
spanCtx := s.Context()
segmentObject.Spans[i] = &agentv3.SpanObject{
SpanId: spanCtx.GetSpanID(),
ParentSpanId: spanCtx.GetParentSpanID(),
StartTime: s.StartTime(),
EndTime: s.EndTime(),
OperationName: s.OperationName(),
Peer: s.Peer(),
SpanType: s.SpanType(),
SpanLayer: s.SpanLayer(),
ComponentId: s.ComponentID(),
IsError: s.IsError(),
Tags: s.Tags(),
Logs: s.Logs(),
}
srr := make([]*agentv3.SegmentReference, 0)
if i == (spanSize-1) && spanCtx.GetParentSpanID() > -1 {
srr = append(srr, &agentv3.SegmentReference{
RefType: agentv3.RefType_CrossThread,
TraceId: spanCtx.GetTraceID(),
ParentTraceSegmentId: spanCtx.GetParentSegmentID(),
ParentSpanId: spanCtx.GetParentSpanID(),
ParentService: r.entity.ServiceName,
ParentServiceInstance: r.entity.ServiceInstanceName,
})
}
if len(s.Refs()) > 0 {
for _, tc := range s.Refs() {
srr = append(srr, &agentv3.SegmentReference{
RefType: agentv3.RefType_CrossProcess,
TraceId: spanCtx.GetTraceID(),
ParentTraceSegmentId: tc.GetParentSegmentID(),
ParentSpanId: tc.GetParentSpanID(),
ParentService: tc.GetParentService(),
ParentServiceInstance: tc.GetParentServiceInstance(),
ParentEndpoint: tc.GetParentEndpoint(),
NetworkAddressUsedAtPeer: tc.GetAddressUsedAtClient(),
})
}
}
segmentObject.Spans[i].Refs = srr
}
defer func() {
// recover the panic caused by close tracingSendCh
if err := recover(); err != nil {
r.logger.Errorf("reporter segment err %v", err)
}
}()
select {
case r.tracingSendCh <- segmentObject:
default:
r.logger.Errorf("reach max tracing send buffer")
}
}
func (r *gRPCReporter) SendMetrics(metrics []reporter.ReportedMeter) {
if len(metrics) == 0 {
return
}
meters := make([]*agentv3.MeterData, len(metrics))
for i, m := range metrics {
meter := &agentv3.MeterData{}
switch data := m.(type) {
case reporter.ReportedMeterSingleValue:
meter.Metric = &agentv3.MeterData_SingleValue{
SingleValue: &agentv3.MeterSingleValue{
Name: data.Name(),
Labels: r.convertLabels(data.Labels()),
Value: data.Value(),
},
}
case reporter.ReportedMeterHistogram:
buckets := make([]*agentv3.MeterBucketValue, len(data.BucketValues()))
for i, b := range data.BucketValues() {
buckets[i] = &agentv3.MeterBucketValue{
Bucket: b.Bucket(),
Count: b.Count(),
IsNegativeInfinity: b.IsNegativeInfinity(),
}
}
meter.Metric = &agentv3.MeterData_Histogram{
Histogram: &agentv3.MeterHistogram{
Name: data.Name(),
Labels: r.convertLabels(data.Labels()),
Values: buckets,
},
}
}
meters[i] = meter
}
meters[0].Service = r.entity.ServiceName
meters[0].ServiceInstance = r.entity.ServiceInstanceName
meters[0].Timestamp = time.Now().UnixNano() / int64(time.Millisecond)
defer func() {
// recover the panic caused by close tracingSendCh
if err := recover(); err != nil {
r.logger.Errorf("reporter metrics err %v", err)
}
}()
select {
case r.metricsSendCh <- meters:
default:
r.logger.Errorf("reach max metrics send buffer")
}
}
func (r *gRPCReporter) SendLog(log *logv3.LogData) {
defer func() {
if err := recover(); err != nil {
r.logger.Errorf("reporter log err %v", err)
}
}()
select {
case r.logSendCh <- log:
default:
}
}
func (r *gRPCReporter) convertLabels(labels map[string]string) []*agentv3.Label {
if len(labels) == 0 {
return nil
}
ls := make([]*agentv3.Label, 0)
for k, v := range labels {
ls = append(ls, &agentv3.Label{
Name: k,
Value: v,
})
}
return ls
}
func (r *gRPCReporter) Close() {
if r.bootFlag {
if r.tracingSendCh != nil {
close(r.tracingSendCh)
}
if r.metricsSendCh != nil {
close(r.metricsSendCh)
}
} else {
r.closeGRPCConn()
}
}
func (r *gRPCReporter) closeGRPCConn() {
if r.conn != nil {
if err := r.conn.Close(); err != nil {
r.logger.Error(err)
}
}
}
// nolint
func (r *gRPCReporter) initSendPipeline() {
if r.traceClient == nil {
return
}
go func() {
StreamLoop:
for {
switch r.updateConnectionStatus() {
case reporter.ConnectionStatusShutdown:
break
case reporter.ConnectionStatusDisconnect:
time.Sleep(5 * time.Second)
continue StreamLoop
}
stream, err := r.traceClient.Collect(metadata.NewOutgoingContext(context.Background(), r.md))
if err != nil {
r.logger.Errorf("open stream error %v", err)
time.Sleep(5 * time.Second)
continue StreamLoop
}
for s := range r.tracingSendCh {
err = stream.Send(s)
if err != nil {
r.logger.Errorf("send segment error %v", err)
r.closeTracingStream(stream)
continue StreamLoop
}
}
r.closeTracingStream(stream)
r.closeGRPCConn()
break
}
}()
go func() {
StreamLoop:
for {
switch r.updateConnectionStatus() {
case reporter.ConnectionStatusShutdown:
break
case reporter.ConnectionStatusDisconnect:
time.Sleep(5 * time.Second)
continue StreamLoop
}
stream, err := r.metricsClient.CollectBatch(metadata.NewOutgoingContext(context.Background(), r.md))
if err != nil {
r.logger.Errorf("open stream error %v", err)
time.Sleep(5 * time.Second)
continue StreamLoop
}
for s := range r.metricsSendCh {
err = stream.Send(&agentv3.MeterDataCollection{
MeterData: s,
})
if err != nil {
r.logger.Errorf("send metrics error %v", err)
r.closeMetricsStream(stream)
continue StreamLoop
}
}
r.closeMetricsStream(stream)
break
}
}()
go func() {
StreamLoop:
for {
switch r.updateConnectionStatus() {
case reporter.ConnectionStatusShutdown:
break
case reporter.ConnectionStatusDisconnect:
time.Sleep(5 * time.Second)
continue StreamLoop
}
stream, err := r.logClient.Collect(metadata.NewOutgoingContext(context.Background(), r.md))
if err != nil {
r.logger.Errorf("open stream error %v", err)
time.Sleep(5 * time.Second)
continue StreamLoop
}
for s := range r.logSendCh {
err = stream.Send(s)
if err != nil {
r.logger.Errorf("send log error %v", err)
r.closeLogStream(stream)
continue StreamLoop
}
}
r.closeLogStream(stream)
break
}
}()
}
func (r *gRPCReporter) updateConnectionStatus() reporter.ConnectionStatus {
state := r.conn.GetState()
switch state {
case connectivity.TransientFailure:
r.connectionStatus = reporter.ConnectionStatusDisconnect
case connectivity.Shutdown:
r.connectionStatus = reporter.ConnectionStatusShutdown
default:
r.connectionStatus = reporter.ConnectionStatusConnected
}
return r.connectionStatus
}
func (r *gRPCReporter) initCDS(cdsWatchers []reporter.AgentConfigChangeWatcher) {
if r.cdsClient == nil {
return
}
// bind watchers
r.cdsService.BindWatchers(cdsWatchers)
// fetch config
go func() {
for {
switch r.updateConnectionStatus() {
case reporter.ConnectionStatusShutdown:
break
case reporter.ConnectionStatusDisconnect:
time.Sleep(r.cdsInterval)
continue
}
configurations, err := r.cdsClient.FetchConfigurations(context.Background(), &configuration.ConfigurationSyncRequest{
Service: r.entity.ServiceName,
Uuid: r.cdsService.UUID,
})
if err != nil {
r.logger.Errorf("fetch dynamic configuration error %v", err)
time.Sleep(r.cdsInterval)
continue
}
if len(configurations.GetCommands()) > 0 && configurations.GetCommands()[0].Command == "ConfigurationDiscoveryCommand" {
command := configurations.GetCommands()[0]
r.cdsService.HandleCommand(command)
}
time.Sleep(r.cdsInterval)
}
}()
}
func (r *gRPCReporter) closeTracingStream(stream agentv3.TraceSegmentReportService_CollectClient) {
_, err := stream.CloseAndRecv()
if err != nil && err != io.EOF {
r.logger.Errorf("send closing error %v", err)
}
}
func (r *gRPCReporter) closeMetricsStream(stream agentv3.MeterReportService_CollectBatchClient) {
_, err := stream.CloseAndRecv()
if err != nil && err != io.EOF {
r.logger.Errorf("send closing error %v", err)
}
}
func (r *gRPCReporter) closeLogStream(stream logv3.LogReportService_CollectClient) {
_, err := stream.CloseAndRecv()
if err != nil && err != io.EOF {
r.logger.Errorf("send closing error %v", err)
}
}
func (r *gRPCReporter) reportInstanceProperties() (err error) {
_, err = r.managementClient.ReportInstanceProperties(metadata.NewOutgoingContext(context.Background(), r.md), &managementv3.InstanceProperties{
Service: r.entity.ServiceName,
ServiceInstance: r.entity.ServiceInstanceName,
Properties: r.entity.Props,
})
return err
}
func (r *gRPCReporter) check() {
if r.checkInterval < 0 || r.conn == nil || r.managementClient == nil {
return
}
go func() {
instancePropertiesSubmitted := false
for {
switch r.updateConnectionStatus() {
case reporter.ConnectionStatusShutdown:
break
case reporter.ConnectionStatusDisconnect:
time.Sleep(r.checkInterval)
continue
}
if !instancePropertiesSubmitted {
err := r.reportInstanceProperties()
if err != nil {
r.logger.Errorf("report serviceInstance properties error %v", err)
time.Sleep(r.checkInterval)
continue
}
instancePropertiesSubmitted = true
}
_, err := r.managementClient.KeepAlive(metadata.NewOutgoingContext(context.Background(), r.md), &managementv3.InstancePingPkg{
Service: r.entity.ServiceName,
ServiceInstance: r.entity.ServiceInstanceName,
})
if err != nil {
r.logger.Errorf("send keep alive signal error %v", err)
}
time.Sleep(r.checkInterval)
}
}()
}