func NewDefaultRouter()

in pulsar/default_router.go [38:110]


func NewDefaultRouter(
	hashFunc func(string) uint32,
	maxBatchingMessages uint,
	maxBatchingSize uint,
	maxBatchingDelay time.Duration,
	disableBatching bool) func(*ProducerMessage, uint32) int {
	state := &defaultRouter{
		currentPartitionCursor: rand.Uint32(),
		lastBatchTimestamp:     time.Now().UnixNano(),
	}

	readClockAfterNumMessages := uint32(maxBatchingMessages / 10)
	return func(message *ProducerMessage, numPartitions uint32) int {
		if numPartitions == 1 {
			// When there are no partitions, don't even bother
			return 0
		}

		if len(message.OrderingKey) != 0 {
			// When an OrderingKey is specified, use the hash of that key
			return int(hashFunc(message.OrderingKey) % numPartitions)
		}

		if len(message.Key) != 0 {
			// When a key is specified, use the hash of that key
			return int(hashFunc(message.Key) % numPartitions)
		}

		// If there's no key, we do round-robin across partition. If no batching go to next partition.
		if disableBatching {
			p := int(state.currentPartitionCursor % numPartitions)
			atomic.AddUint32(&state.currentPartitionCursor, 1)
			return p
		}

		// If there's no key, we do round-robin across partition, sticking with a given
		// partition for a certain amount of messages or volume buffered or the max delay to batch is reached so that
		// we ensure having a decent amount of batching of the messages.
		var now int64
		size := uint32(len(message.Payload))
		partitionCursor := atomic.LoadUint32(&state.currentPartitionCursor)
		messageCount := atomic.AddUint32(&state.msgCounter, 1)
		batchSize := atomic.AddUint32(&state.cumulativeBatchSize, size)

		// Note: use greater-than for the threshold check so that we don't route this message to a new partition
		//       before a batch is complete.
		messageCountReached := messageCount > uint32(maxBatchingMessages)
		sizeReached := batchSize > uint32(maxBatchingSize)
		durationReached := false
		if readClockAfterNumMessages == 0 || messageCount%readClockAfterNumMessages == 0 {
			now = time.Now().UnixNano()
			lastBatchTime := atomic.LoadInt64(&state.lastBatchTimestamp)
			durationReached = now-lastBatchTime > maxBatchingDelay.Nanoseconds()
		}
		if messageCountReached || sizeReached || durationReached {
			// Note: CAS to ensure that concurrent go-routines can only move the cursor forward by one so that
			//       partitions are not skipped.
			newCursor := partitionCursor + 1
			if atomic.CompareAndSwapUint32(&state.currentPartitionCursor, partitionCursor, newCursor) {
				atomic.StoreUint32(&state.msgCounter, 0)
				atomic.StoreUint32(&state.cumulativeBatchSize, 0)
				if now == 0 {
					now = time.Now().UnixNano()
				}
				atomic.StoreInt64(&state.lastBatchTimestamp, now)
			}

			return int(newCursor % numPartitions)
		}

		return int(partitionCursor % numPartitions)
	}
}