in plugins/core/reporter/grpc/grpc.go [302:396]
func (r *gRPCReporter) initSendPipeline() {
if r.traceClient == nil {
return
}
go func() {
StreamLoop:
for {
switch r.updateConnectionStatus() {
case reporter.ConnectionStatusShutdown:
break
case reporter.ConnectionStatusDisconnect:
time.Sleep(5 * time.Second)
continue StreamLoop
}
stream, err := r.traceClient.Collect(metadata.NewOutgoingContext(context.Background(), r.md))
if err != nil {
r.logger.Errorf("open stream error %v", err)
time.Sleep(5 * time.Second)
continue StreamLoop
}
for s := range r.tracingSendCh {
err = stream.Send(s)
if err != nil {
r.logger.Errorf("send segment error %v", err)
r.closeTracingStream(stream)
continue StreamLoop
}
}
r.closeTracingStream(stream)
r.closeGRPCConn()
break
}
}()
go func() {
StreamLoop:
for {
switch r.updateConnectionStatus() {
case reporter.ConnectionStatusShutdown:
break
case reporter.ConnectionStatusDisconnect:
time.Sleep(5 * time.Second)
continue StreamLoop
}
stream, err := r.metricsClient.CollectBatch(metadata.NewOutgoingContext(context.Background(), r.md))
if err != nil {
r.logger.Errorf("open stream error %v", err)
time.Sleep(5 * time.Second)
continue StreamLoop
}
for s := range r.metricsSendCh {
err = stream.Send(&agentv3.MeterDataCollection{
MeterData: s,
})
if err != nil {
r.logger.Errorf("send metrics error %v", err)
r.closeMetricsStream(stream)
continue StreamLoop
}
}
r.closeMetricsStream(stream)
break
}
}()
go func() {
StreamLoop:
for {
switch r.updateConnectionStatus() {
case reporter.ConnectionStatusShutdown:
break
case reporter.ConnectionStatusDisconnect:
time.Sleep(5 * time.Second)
continue StreamLoop
}
stream, err := r.logClient.Collect(metadata.NewOutgoingContext(context.Background(), r.md))
if err != nil {
r.logger.Errorf("open stream error %v", err)
time.Sleep(5 * time.Second)
continue StreamLoop
}
for s := range r.logSendCh {
err = stream.Send(s)
if err != nil {
r.logger.Errorf("send log error %v", err)
r.closeLogStream(stream)
continue StreamLoop
}
}
r.closeLogStream(stream)
break
}
}()
}