in internal/satellite/module/processor/processor.go [64:90]
func (p *Processor) processPerPartition(ctx context.Context, partition int, wg *sync.WaitGroup) {
go func() {
childCtx, cancel := context.WithCancel(ctx)
defer wg.Done()
for {
select {
// receive the input event from the output channel of the gatherer
case e := <-p.gatherer.OutputDataChannel(partition):
c := &event.OutputEventContext{
Offset: &e.Offset,
Context: make(map[string]*v1.SniffData),
}
c.Put(e.Event)
// processing the event with filters, that put the necessary events to OutputEventContext.
for _, f := range p.runningFilters {
f.Process(c)
}
// send the final context that contains many events to the sender.
p.sender.InputDataChannel(partition) <- c
case <-childCtx.Done():
cancel()
p.Shutdown()
return
}
}
}()
}