in sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/load_balancer.rs [262:297]
fn greedy_load_balancer(
&self,
load_balancer_info: &LoadBalancerInfo,
) -> Result<Vec<Ownership>> {
let mut ours = load_balancer_info.current_ownership.clone();
debug!(
"[{}]Greedy load balancer. ownership for client ID: {}",
self.consumer_client_details.client_id,
Self::partitions_for_ownership(&ours)
);
debug!(
"UnownedOrExpired: {}",
Self::partitions_for_ownership(&load_balancer_info.unowned_or_expired)
);
let mut random_ownerships = self.get_random_ownerships(
&load_balancer_info.unowned_or_expired,
load_balancer_info.max_allowed - ours.len(),
)?;
ours.append(&mut random_ownerships);
if ours.len() < load_balancer_info.max_allowed {
debug!("Not enough expired or unowned partitions, will need to steal from other processors. Stealing up to {} partitions.",
load_balancer_info.max_allowed - ours.len());
debug!("Stealing from {:?}", load_balancer_info.above_max);
random_ownerships = self.get_random_ownerships(
&load_balancer_info.above_max,
load_balancer_info.max_allowed - ours.len(),
)?;
ours.append(&mut random_ownerships);
}
for ownership in ours.iter_mut() {
self.reset_ownership(ownership);
}
Ok(ours)
}