receiver/stefreceiver/internal/responder.go (106 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stefreceiver/internal" import ( "sync/atomic" "time" stefgrpc "github.com/splunk/stef/go/grpc" "github.com/splunk/stef/go/grpc/stef_proto" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // Responder is responsible for sending responses to the STEF/gRPC client. // There is one Responder per stream. type Responder struct { logger *zap.Logger // Stream to send responses to. stream stefgrpc.STEFStream // Channel to stop the Responder goroutine. stopCh chan struct{} // The next ack ID to send to the client. nextAckID atomic.Uint64 // The last error that occurred while sending responses. lastError atomic.Value // Channel to receive bad data information from. Data written to this channel // will be sent as a response to the client. badDataCh chan BadData // Interval at which the responder sends acks. ackInterval time.Duration } // BadData describes a range of records that were bad. type BadData struct { // The range of records that were bad. FromID<=ToID. // ToID is also equal to the last ID read from STEF stream. FromID, ToID uint64 } // Max number of BadData records that can be accumulated and be sent in a single response. // The most typical number in one response will be 1, larger numbers are very unlikely, so we // don't need a large buffer. const badDataMaxBatchSize = 10 func NewResponder(logger *zap.Logger, stream stefgrpc.STEFStream, ackInterval time.Duration) *Responder { return &Responder{ logger: logger, stream: stream, stopCh: make(chan struct{}), badDataCh: make(chan BadData, badDataMaxBatchSize), ackInterval: ackInterval, } } // ScheduleAck schedules an ack to be sent to the client. The ack will be sent // at the next opportunity in a response to client. If ScheduleAck is called // repeatedly, only the last value will be used in the next response. // It is normally expected that ScheduleAck is called with increasing recordID values. func (r *Responder) ScheduleAck(recordID uint64) { r.nextAckID.Store(recordID) } // ScheduleBadDataResponse schedules BadData record to be sent to the client. // Multiple successive calls may be accumulated in a batch and sent in // one response to the client. func (r *Responder) ScheduleBadDataResponse(badData BadData) { r.badDataCh <- badData } // LastError returns the last error that occurred while sending responses or // nil if there never was an error. func (r *Responder) LastError() error { err := r.lastError.Load() if err == nil { return nil } return err.(error) } // Stop begins stopping the Responder. func (r *Responder) Stop() { close(r.stopCh) } // Run the Responder. Normally called on a separate goroutine and blocks // until stopped by calling Stop(). func (r *Responder) Run() { // Time interval to wait before sending an ack. t := time.NewTicker(r.ackInterval) defer t.Stop() lastAckedID := &atomic.Uint64{} // Preallocate to avoid allocations in the loop. badDataResponse := &stef_proto.STEFDataResponse{ BadDataRecordIdRanges: make([]*stef_proto.STEFIDRange, 0, 8), } ackResponse := &stef_proto.STEFDataResponse{} for { select { case badData := <-r.badDataCh: r.composeBadDataResponse(badDataResponse, badData) if err := r.stream.SendDataResponse(badDataResponse); err != nil { r.logger.Error("Error acking STEF gRPC connection", zap.Error(err)) r.lastError.Store(err) } else { lastAckedID.Store(badDataResponse.AckRecordId) } case <-t.C: readRecordID := r.nextAckID.Load() oldValue := lastAckedID.Swap(readRecordID) if readRecordID > oldValue { ackResponse.AckRecordId = readRecordID if err := r.stream.SendDataResponse(ackResponse); err != nil { st, ok := status.FromError(err) // This is not a regular disconnection case, as in the client closed the connection. if !ok || st.Code() != codes.Canceled { r.logger.Error("Error acking STEF gRPC connection", zap.Error(err)) r.lastError.Store(err) } } } case <-r.stopCh: return } } } func (r *Responder) composeBadDataResponse(response *stef_proto.STEFDataResponse, badData BadData) { response.AckRecordId = badData.ToID // First bad data range. response.BadDataRecordIdRanges = response.BadDataRecordIdRanges[:1] response.BadDataRecordIdRanges[0].FromId = badData.FromID response.BadDataRecordIdRanges[0].ToId = badData.ToID // See if there is more bad data we can report all of it in the same response. for { select { case moreBadData := <-r.badDataCh: // Add a range. response.BadDataRecordIdRanges = append( response.BadDataRecordIdRanges, &stef_proto.STEFIDRange{ FromId: moreBadData.FromID, ToId: moreBadData.ToID, }, ) // Use the last ID value for AckRecordID if response.AckRecordId < moreBadData.ToID { response.AckRecordId = moreBadData.ToID } default: return } } }