pkg/tools/btf/queue.go (116 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package btf import ( "context" "sync" "time" "github.com/cilium/ebpf" ) // queueChannelReducingCountCheckInterval is the interval to check the queue channel reducing count // if the reducing count is almost full, then added a warning log const queueChannelReducingCountCheckInterval = time.Second * 5 type PartitionContext interface { Start(ctx context.Context) Consume(data interface{}) } type EventQueue struct { name string count int receivers []*mapReceiver partitions []*partition startOnce sync.Once } type mapReceiver struct { emap *ebpf.Map perCPUBuffer int dataSupplier func() interface{} router func(data interface{}) int parallels int } func NewEventQueue(name string, partitionCount, sizePerPartition int, contextGenerator func(partitionNum int) PartitionContext) *EventQueue { partitions := make([]*partition, 0) for i := 0; i < partitionCount; i++ { partitions = append(partitions, newPartition(i, sizePerPartition, contextGenerator(i))) } return &EventQueue{name: name, count: partitionCount, partitions: partitions} } func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, parallels int, dataSupplier func() interface{}, routeGenerator func(data interface{}) int) { e.receivers = append(e.receivers, &mapReceiver{ emap: emap, perCPUBuffer: perCPUBufferSize, dataSupplier: dataSupplier, router: routeGenerator, parallels: parallels, }) } func (e *EventQueue) Start(ctx context.Context, linker *Linker) { e.startOnce.Do(func() { e.start0(ctx, linker) }) } func (e *EventQueue) Push(key int, data interface{}) { // append data e.partitions[key%e.count].channel <- data } func (e *EventQueue) PartitionContexts() []PartitionContext { result := make([]PartitionContext, 0) for _, p := range e.partitions { result = append(result, p.ctx) } return result } func (e *EventQueue) start0(ctx context.Context, linker *Linker) { for _, r := range e.receivers { func(receiver *mapReceiver) { linker.ReadEventAsyncWithBufferSize(receiver.emap, func(data interface{}) { e.routerTransformer(data, receiver.router) }, receiver.perCPUBuffer, r.parallels, receiver.dataSupplier) }(r) } for i := 0; i < len(e.partitions); i++ { go func(ctx context.Context, inx int) { p := e.partitions[inx] p.ctx.Start(ctx) for { select { // consume the data case data := <-p.channel: p.ctx.Consume(data) // shutdown the consumer case <-ctx.Done(): return } } }(ctx, i) } // channel reducing count check go func() { ticker := time.NewTicker(queueChannelReducingCountCheckInterval) defer ticker.Stop() for { select { case <-ticker.C: for _, p := range e.partitions { reducingCount := len(p.channel) if reducingCount > cap(p.channel)*9/10 { log.Warnf("queue %s partition %d reducing count is almost full, "+ "please trying to increase the parallels count or queue size, status: %d/%d", e.name, p.index, reducingCount, cap(p.channel)) } } case <-ctx.Done(): return } } }() } func (e *EventQueue) routerTransformer(data interface{}, routeGenerator func(data interface{}) int) { key := routeGenerator(data) e.Push(key, data) } type partition struct { index int channel chan interface{} ctx PartitionContext } func newPartition(index, size int, ctx PartitionContext) *partition { return &partition{ index: index, channel: make(chan interface{}, size), ctx: ctx, } }