receiver/carbonreceiver/internal/transport/tcp_server.go (157 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/internal/transport" import ( "bufio" "context" "errors" "io" "net" "strings" "sync" "time" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" ) type tcpServer struct { ln net.Listener wg sync.WaitGroup idleTimeout time.Duration reporter Reporter } var _ Server = (*tcpServer)(nil) // NewTCPServer creates a transport.Server using TCP as its transport. func NewTCPServer( addr string, idleTimeout time.Duration, ) (Server, error) { ln, err := net.Listen("tcp", addr) if err != nil { return nil, err } t := tcpServer{ ln: ln, idleTimeout: idleTimeout, } return &t, nil } func (t *tcpServer) ListenAndServe( parser protocol.Parser, nextConsumer consumer.Metrics, reporter Reporter, ) error { if parser == nil || nextConsumer == nil || reporter == nil { return errNilListenAndServeParameters } acceptedConnMap := make(map[net.Conn]struct{}) connMapMtx := &sync.Mutex{} t.reporter = reporter var err error for { conn, acceptErr := t.ln.Accept() if acceptErr == nil { connMapMtx.Lock() acceptedConnMap[conn] = struct{}{} connMapMtx.Unlock() t.wg.Add(1) go func(c net.Conn) { t.handleConnection(parser, nextConsumer, c) connMapMtx.Lock() delete(acceptedConnMap, c) connMapMtx.Unlock() t.wg.Done() }(conn) continue } var netErr net.Error if errors.As(acceptErr, &netErr) { t.reporter.OnDebugf( "TCP Transport (%s) - Accept (temporary=%v) net.Error: %v", t.ln.Addr().String(), netErr.Timeout(), netErr) if netErr.Timeout() { continue } } err = acceptErr break } t.reporter.OnDebugf( "TCP Transport (%s) exiting Accept loop error: %v", t.ln.Addr().String(), err) // Close any lingering connection connMapMtx.Lock() for conn := range acceptedConnMap { conn.Close() } connMapMtx.Unlock() return err } func (t *tcpServer) Close() error { err := t.ln.Close() t.wg.Wait() return err } func (t *tcpServer) handleConnection( p protocol.Parser, nextConsumer consumer.Metrics, conn net.Conn, ) { defer conn.Close() reader := bufio.NewReader(conn) reporterActive := false var ctx context.Context for { if err := conn.SetDeadline(time.Now().Add(t.idleTimeout)); err != nil { t.reporter.OnDebugf( "TCP Transport (%s) - conn.SetDeadLine error: %v", t.ln.Addr(), err) return } // reader.ReadBytes call below will block until either: // // * a '\n' char is read // * the connection is closed (either by client or server) // * an idle timeout happens (see call to conn.SetDeadline above) // // Notice that it is possible for the function to return with error at // the same time that it returns data (typically the error is io.EOF in // this case). bytes, err := reader.ReadBytes((byte)('\n')) var numReceivedMetricPoints int line := strings.TrimSpace(string(bytes)) if line != "" { if !reporterActive { ctx = t.reporter.OnDataReceived(context.Background()) reporterActive = true } numReceivedMetricPoints++ var metric pmetric.Metric metric, err = p.Parse(line) if err != nil { t.reporter.OnTranslationError(ctx, err) continue } metrics := pmetric.NewMetrics() newMetric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() metric.MoveTo(newMetric) err = nextConsumer.ConsumeMetrics(ctx, metrics) t.reporter.OnMetricsProcessed(ctx, numReceivedMetricPoints, err) reporterActive = false if err != nil { // The protocol doesn't account for returning errors. // Since this is a TCP connection it seems reasonable to close the // connection as a way to report "error" back to client and minimize // the effect of a client constantly submitting bad data. return } } netErr := &net.OpError{} if errors.As(err, &netErr) { t.reporter.OnDebugf("TCP Transport (%s) - net.OpError: %v", t.ln.Addr(), netErr) if netErr.Timeout() { // We want to end on timeout so idle connections are purged. if reporterActive { t.reporter.OnMetricsProcessed(ctx, 0, err) } return } } if errors.Is(err, io.EOF) { t.reporter.OnDebugf( "TCP Transport (%s) - error: %v", t.ln.Addr(), err) if reporterActive { t.reporter.OnMetricsProcessed(ctx, 0, err) } return } } }