fn greedy_load_balancer()

in sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/load_balancer.rs [262:297]


    fn greedy_load_balancer(
        &self,
        load_balancer_info: &LoadBalancerInfo,
    ) -> Result<Vec<Ownership>> {
        let mut ours = load_balancer_info.current_ownership.clone();
        debug!(
            "[{}]Greedy load balancer. ownership for client ID: {}",
            self.consumer_client_details.client_id,
            Self::partitions_for_ownership(&ours)
        );
        debug!(
            "UnownedOrExpired: {}",
            Self::partitions_for_ownership(&load_balancer_info.unowned_or_expired)
        );
        let mut random_ownerships = self.get_random_ownerships(
            &load_balancer_info.unowned_or_expired,
            load_balancer_info.max_allowed - ours.len(),
        )?;
        ours.append(&mut random_ownerships);

        if ours.len() < load_balancer_info.max_allowed {
            debug!("Not enough expired or unowned partitions, will need to steal from other processors. Stealing up to {} partitions.",
                load_balancer_info.max_allowed - ours.len());
            debug!("Stealing from {:?}", load_balancer_info.above_max);
            random_ownerships = self.get_random_ownerships(
                &load_balancer_info.above_max,
                load_balancer_info.max_allowed - ours.len(),
            )?;
            ours.append(&mut random_ownerships);
        }

        for ownership in ours.iter_mut() {
            self.reset_ownership(ownership);
        }
        Ok(ours)
    }