in plugins/queue/partition/partitioned_queue.go [120:153]
func (p *PartitionedQueue) findPartition(_ *v1.SniffData) (int, error) {
if p.Partition == 1 {
return 0, nil
}
// increment
var partition int
for {
result := atomic.AddInt32(&p.loadBalancerIndex, 1)
partition = int(result)
if partition < p.Partition {
break
} else if atomic.CompareAndSwapInt32(&p.loadBalancerIndex, result, 0) {
partition = 0
break
}
}
// check partition is full
if !p.subQueues[partition].IsFull() {
return partition, nil
}
for addition := 1; addition < p.Partition; addition++ {
checkPartition := partition + addition
if checkPartition >= p.Partition {
checkPartition -= p.Partition
}
if !p.subQueues[checkPartition].IsFull() {
return checkPartition, nil
}
}
return 0, api.ErrFull
}