in pkg/tools/btf/queue.go [92:138]
func (e *EventQueue) start0(ctx context.Context, linker *Linker) {
for _, r := range e.receivers {
func(receiver *mapReceiver) {
linker.ReadEventAsyncWithBufferSize(receiver.emap, func(data interface{}) {
e.routerTransformer(data, receiver.router)
}, receiver.perCPUBuffer, r.parallels, receiver.dataSupplier)
}(r)
}
for i := 0; i < len(e.partitions); i++ {
go func(ctx context.Context, inx int) {
p := e.partitions[inx]
p.ctx.Start(ctx)
for {
select {
// consume the data
case data := <-p.channel:
p.ctx.Consume(data)
// shutdown the consumer
case <-ctx.Done():
return
}
}
}(ctx, i)
}
// channel reducing count check
go func() {
ticker := time.NewTicker(queueChannelReducingCountCheckInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
for _, p := range e.partitions {
reducingCount := len(p.channel)
if reducingCount > cap(p.channel)*9/10 {
log.Warnf("queue %s partition %d reducing count is almost full, "+
"please trying to increase the parallels count or queue size, status: %d/%d",
e.name, p.index, reducingCount, cap(p.channel))
}
}
case <-ctx.Done():
return
}
}
}()
}