func()

in plugins/core/reporter/grpc/grpc.go [301:395]


func (r *gRPCReporter) initSendPipeline() {
	if r.traceClient == nil {
		return
	}
	go func() {
	StreamLoop:
		for {
			switch r.updateConnectionStatus() {
			case reporter.ConnectionStatusShutdown:
				break
			case reporter.ConnectionStatusDisconnect:
				time.Sleep(5 * time.Second)
				continue StreamLoop
			}

			stream, err := r.traceClient.Collect(metadata.NewOutgoingContext(context.Background(), r.md))
			if err != nil {
				r.logger.Errorf("open stream error %v", err)
				time.Sleep(5 * time.Second)
				continue StreamLoop
			}
			for s := range r.tracingSendCh {
				err = stream.Send(s)
				if err != nil {
					r.logger.Errorf("send segment error %v", err)
					r.closeTracingStream(stream)
					continue StreamLoop
				}
			}
			r.closeTracingStream(stream)
			r.closeGRPCConn()
			break
		}
	}()
	go func() {
	StreamLoop:
		for {
			switch r.updateConnectionStatus() {
			case reporter.ConnectionStatusShutdown:
				break
			case reporter.ConnectionStatusDisconnect:
				time.Sleep(5 * time.Second)
				continue StreamLoop
			}

			stream, err := r.metricsClient.CollectBatch(metadata.NewOutgoingContext(context.Background(), r.md))
			if err != nil {
				r.logger.Errorf("open stream error %v", err)
				time.Sleep(5 * time.Second)
				continue StreamLoop
			}
			for s := range r.metricsSendCh {
				err = stream.Send(&agentv3.MeterDataCollection{
					MeterData: s,
				})
				if err != nil {
					r.logger.Errorf("send metrics error %v", err)
					r.closeMetricsStream(stream)
					continue StreamLoop
				}
			}
			r.closeMetricsStream(stream)
			break
		}
	}()
	go func() {
	StreamLoop:
		for {
			switch r.updateConnectionStatus() {
			case reporter.ConnectionStatusShutdown:
				break
			case reporter.ConnectionStatusDisconnect:
				time.Sleep(5 * time.Second)
				continue StreamLoop
			}

			stream, err := r.logClient.Collect(metadata.NewOutgoingContext(context.Background(), r.md))
			if err != nil {
				r.logger.Errorf("open stream error %v", err)
				time.Sleep(5 * time.Second)
				continue StreamLoop
			}
			for s := range r.logSendCh {
				err = stream.Send(s)
				if err != nil {
					r.logger.Errorf("send log error %v", err)
					r.closeLogStream(stream)
					continue StreamLoop
				}
			}
			r.closeLogStream(stream)
			break
		}
	}()
}