receiver/stefreceiver/stef.go (117 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package stefreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stefreceiver"
import (
"context"
"errors"
"net"
stefgrpc "github.com/splunk/stef/go/grpc"
"github.com/splunk/stef/go/grpc/stef_proto"
"github.com/splunk/stef/go/otel/oteltef"
stefpdatametrics "github.com/splunk/stef/go/pdata/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stefreceiver/internal"
)
type stefReceiver struct {
cfg *Config
serverGRPC *grpc.Server
nextMetricsConsumer consumer.Metrics
settings receiver.Settings
eg errgroup.Group
}
// Start runs the STEF gRPC receiver.
func (r *stefReceiver) Start(ctx context.Context, host component.Host) error {
var err error
if r.serverGRPC, err = r.cfg.ToServer(ctx, host, r.settings.TelemetrySettings); err != nil {
return err
}
r.settings.Logger.Info("Starting GRPC server", zap.String("endpoint", r.cfg.NetAddr.Endpoint))
schema, err := oteltef.MetricsWireSchema()
if err != nil {
return err
}
var gln net.Listener
if gln, err = r.cfg.NetAddr.Listen(context.Background()); err != nil {
return err
}
settings := stefgrpc.ServerSettings{
Logger: nil,
ServerSchema: &schema,
MaxDictBytes: 0,
Callbacks: stefgrpc.Callbacks{OnStream: r.onStream},
}
stefSrv := stefgrpc.NewStreamServer(settings)
stef_proto.RegisterSTEFDestinationServer(r.serverGRPC, stefSrv)
r.eg.Go(func() error {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil && !errors.Is(errGrpc, grpc.ErrServerStopped) {
componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(errGrpc))
}
return nil
})
return nil
}
// Shutdown is a method to turn off receiving.
func (r *stefReceiver) Shutdown(_ context.Context) error {
if r.serverGRPC != nil {
r.serverGRPC.GracefulStop()
}
return r.eg.Wait()
}
func (r *stefReceiver) onStream(grpcReader stefgrpc.GrpcReader, stream stefgrpc.STEFStream) error {
r.settings.Logger.Debug("Incoming STEF/gRPC connection.")
reader, err := oteltef.NewMetricsReader(grpcReader)
if err != nil {
r.settings.Logger.Error("Cannot decode data on incoming STEF/gRPC connection", zap.Error(err))
return err
}
// Create a responder for this stream and run it in a separate goroutine.
resp := internal.NewResponder(r.settings.Logger, stream, r.cfg.AckInterval)
defer resp.Stop()
go resp.Run()
converter := stefpdatametrics.STEFToOTLPUnsorted{}
// Read, decode, convert the incoming data and push it to the next consumer.
for {
respError := resp.LastError()
if respError != nil {
// We had problem sending responses. Can't continue using this connection since
// responding is essential for operation.
r.settings.Logger.Error(
"Closing STEF/gRPC connection since responding failed",
zap.Error(respError),
)
return respError
}
// Mark the start of the converted batch.
fromRecordID := reader.RecordCount()
// Read and convert records. We use ConvertTillEndOfFrame to make sure we are not
// blocked in the middle of a batch indefinitely, with lingering data in memory,
// neither pushed to pipeline, nor acked.
mdata, err := converter.ConvertTillEndOfFrame(reader)
if err != nil {
st, ok := status.FromError(err)
if ok && st.Code() == codes.Canceled {
// A regular disconnection case. The client closed the connection.
r.settings.Logger.Debug("STEF/gRPC connection closed", zap.Error(err))
} else {
r.settings.Logger.Error("Cannot read from STEF/gRPC connection", zap.Error(err))
}
return err
}
toRecordID := reader.RecordCount()
// Push converted data to the next consumer.
if err := r.nextMetricsConsumer.ConsumeMetrics(context.Background(), mdata); err != nil {
r.settings.Logger.Error(
"Error pushing data to consumer",
zap.Error(err),
zap.Uint64("fromID", fromRecordID),
zap.Uint64("toID", toRecordID),
)
if !consumererror.IsPermanent(err) {
// The next consumer is temporarily unable to process the data.
// Close the stream and indicate to client to try again later.
return status.New(codes.Unavailable, "try again later").Err()
}
// This is a permanent error. Let the client know and continue receiving data.
resp.ScheduleBadDataResponse(
internal.BadData{
FromID: fromRecordID,
ToID: toRecordID,
},
)
} else {
// Successfully received and consumed. Acknowledge it.
resp.ScheduleAck(toRecordID)
}
}
}