func()

in plugins/queue/partition/partitioned_queue.go [120:153]


func (p *PartitionedQueue) findPartition(_ *v1.SniffData) (int, error) {
	if p.Partition == 1 {
		return 0, nil
	}

	// increment
	var partition int
	for {
		result := atomic.AddInt32(&p.loadBalancerIndex, 1)
		partition = int(result)

		if partition < p.Partition {
			break
		} else if atomic.CompareAndSwapInt32(&p.loadBalancerIndex, result, 0) {
			partition = 0
			break
		}
	}

	// check partition is full
	if !p.subQueues[partition].IsFull() {
		return partition, nil
	}
	for addition := 1; addition < p.Partition; addition++ {
		checkPartition := partition + addition
		if checkPartition >= p.Partition {
			checkPartition -= p.Partition
		}
		if !p.subQueues[checkPartition].IsFull() {
			return checkPartition, nil
		}
	}
	return 0, api.ErrFull
}