func()

in internal/satellite/module/processor/processor.go [64:90]


func (p *Processor) processPerPartition(ctx context.Context, partition int, wg *sync.WaitGroup) {
	go func() {
		childCtx, cancel := context.WithCancel(ctx)
		defer wg.Done()
		for {
			select {
			// receive the input event from the output channel of the gatherer
			case e := <-p.gatherer.OutputDataChannel(partition):
				c := &event.OutputEventContext{
					Offset:  &e.Offset,
					Context: make(map[string]*v1.SniffData),
				}
				c.Put(e.Event)
				// processing the event with filters, that put the necessary events to OutputEventContext.
				for _, f := range p.runningFilters {
					f.Process(c)
				}
				// send the final context that contains many events to the sender.
				p.sender.InputDataChannel(partition) <- c
			case <-childCtx.Done():
				cancel()
				p.Shutdown()
				return
			}
		}
	}()
}