in internal/satellite/module/sender/sender.go [224:263]
func (s *Sender) consume(batch *buffer.BatchBuffer) {
if batch.Len() == 0 {
return
}
log.Logger.WithFields(logrus.Fields{
"pipe": s.config.PipeName,
"offset": batch.Last(),
"size": batch.Len(),
}).Info("sender module is flushing a new batch buffer.")
var events = make(map[v1.SniffType]event.BatchEvents)
for i := 0; i < batch.Len(); i++ {
eventContext := batch.Buf()[i]
for _, e := range eventContext.Context {
if e.Remote {
events[e.Type] = append(events[e.Type], e)
}
}
}
for _, f := range s.runningForwarders {
for t, batchEvents := range events {
if f.ForwardType() != t {
continue
}
if err := f.Forward(batchEvents); err == nil {
s.sendCounter.Add(float64(len(batchEvents)), s.config.PipeName, "success", f.ForwardType().String())
continue
} else {
log.Logger.WithFields(logrus.Fields{
"pipe": s.config.PipeName,
"offset": batch.Last(),
"size": len(batchEvents),
}).Warnf("forward event failure: %v", err)
}
if !s.runningFallbacker.FallBack(batchEvents, f.Forward) {
s.sendCounter.Add(float64(len(batchEvents)), s.config.PipeName, "failure", f.ForwardType().String())
}
}
}
s.gatherer.Ack(batch.Last())
}