func()

in internal/satellite/module/sender/sender.go [225:264]


func (s *Sender) consume(batch *buffer.BatchBuffer) {
	if batch.Len() == 0 {
		return
	}
	log.Logger.WithFields(logrus.Fields{
		"pipe":   s.config.PipeName,
		"offset": batch.Last(),
		"size":   batch.Len(),
	}).Info("sender module is flushing a new batch buffer.")
	var events = make(map[v1.SniffType]event.BatchEvents)
	for i := 0; i < batch.Len(); i++ {
		eventContext := batch.Buf()[i]
		for _, e := range eventContext.Context {
			if e.Remote {
				events[e.Type] = append(events[e.Type], e)
			}
		}
	}
	for _, f := range s.runningForwarders {
		for t, batchEvents := range events {
			if f.ForwardType() != t {
				continue
			}
			if err := f.Forward(batchEvents); err == nil {
				s.sendCounter.Add(float64(len(batchEvents)), s.config.PipeName, "success", f.ForwardType().String())
				continue
			} else {
				log.Logger.WithFields(logrus.Fields{
					"pipe":   s.config.PipeName,
					"offset": batch.Last(),
					"size":   len(batchEvents),
				}).Warnf("forward event failure: %v", err)
			}
			if !s.runningFallbacker.FallBack(batchEvents, f.Forward) {
				s.sendCounter.Add(float64(len(batchEvents)), s.config.PipeName, "failure", f.ForwardType().String())
			}
		}
	}
	s.gatherer.Ack(batch.Last())
}