func()

in pkg/tools/btf/queue.go [92:138]


func (e *EventQueue) start0(ctx context.Context, linker *Linker) {
	for _, r := range e.receivers {
		func(receiver *mapReceiver) {
			linker.ReadEventAsyncWithBufferSize(receiver.emap, func(data interface{}) {
				e.routerTransformer(data, receiver.router)
			}, receiver.perCPUBuffer, r.parallels, receiver.dataSupplier)
		}(r)
	}

	for i := 0; i < len(e.partitions); i++ {
		go func(ctx context.Context, inx int) {
			p := e.partitions[inx]
			p.ctx.Start(ctx)
			for {
				select {
				// consume the data
				case data := <-p.channel:
					p.ctx.Consume(data)
				// shutdown the consumer
				case <-ctx.Done():
					return
				}
			}
		}(ctx, i)
	}

	// channel reducing count check
	go func() {
		ticker := time.NewTicker(queueChannelReducingCountCheckInterval)
		defer ticker.Stop()
		for {
			select {
			case <-ticker.C:
				for _, p := range e.partitions {
					reducingCount := len(p.channel)
					if reducingCount > cap(p.channel)*9/10 {
						log.Warnf("queue %s partition %d reducing count is almost full, "+
							"please trying to increase the parallels count or queue size, status: %d/%d",
							e.name, p.index, reducingCount, cap(p.channel))
					}
				}
			case <-ctx.Done():
				return
			}
		}
	}()
}