takeMessageQueues()

in nodejs/src/producer/PublishingLoadBalancer.ts [41:78]


  takeMessageQueues(excluded: Map<string, Endpoints>, count: number) {
    if (this.#index >= this.#messageQueues.length) {
      this.#index = 0;
    }
    let next = this.#index++;
    const candidates: MessageQueue[] = [];
    const candidateBrokerNames = new Set<string>();

    const size = this.#messageQueues.length;
    for (let i = 0; i < size; i++) {
      const messageQueue = this.#messageQueues[next++ % size];
      const broker = messageQueue.broker;
      const brokerName = broker.name;
      if (!excluded.has(broker.endpoints.facade) && !candidateBrokerNames.has(brokerName)) {
        candidateBrokerNames.add(brokerName);
        candidates.push(messageQueue);
      }
      if (candidates.length >= count) {
        return candidates;
      }
    }
    // If all endpoints are isolated.
    if (candidates.length === 0) {
      for (let i = 0; i < size; i++) {
        const messageQueue = this.#messageQueues[next++ % size];
        const broker = messageQueue.broker;
        const brokerName = broker.name;
        if (!candidateBrokerNames.has(brokerName)) {
          candidateBrokerNames.add(brokerName);
          candidates.push(messageQueue);
        }
        if (candidates.length >= count) {
          return candidates;
        }
      }
    }
    return candidates;
  }