in eden/mononoke/cmds/blobstore_healer/healer.rs [294:480]
fn heal_blob<'out>(
ctx: &'out CoreContext,
sync_queue: &'out dyn BlobstoreSyncQueue,
blobstores: &'out HashMap<BlobstoreId, Arc<dyn Blobstore>>,
healing_deadline: DateTime,
key: String,
multiplex_id: MultiplexId,
entries: &[BlobstoreSyncQueueEntry],
) -> Option<impl Future<Output = Result<HealStats>> + 'out> {
if entries.is_empty() {
return None;
}
// This is needed as we load by key, and a given key may have entries both before and after
// the deadline. We leave the key rather than re-add to avoid entries always being too new.
if !entries.iter().all(|e| e.timestamp < healing_deadline) {
return None;
}
let num_entries: usize = entries.len();
let operation_key = entries[0].operation_key.clone();
let blob_size = entries[0].blob_size;
let (seen_blobstores, unknown_seen_blobstores): (HashSet<_>, HashSet<_>) =
entries.iter().partition_map(|entry| {
let id = entry.blobstore_id;
if blobstores.contains_key(&id) {
Either::Left(id)
} else {
Either::Right(id)
}
});
let num_unknown_entries: usize = unknown_seen_blobstores.len();
if !unknown_seen_blobstores.is_empty() {
warn!(
ctx.logger(),
"Ignoring unknown blobstores {:?} for key {}", unknown_seen_blobstores, key
);
}
let mut stores_to_heal: HashSet<BlobstoreId> = blobstores
.iter()
.filter_map(|(key, _)| {
if seen_blobstores.contains(key) {
None
} else {
Some(key.clone())
}
})
.collect();
if stores_to_heal.is_empty() || seen_blobstores.is_empty() {
// All blobstores have been synchronized or all are unknown to be requeued
return Some(
async move {
if !unknown_seen_blobstores.is_empty() {
requeue_partial_heal(
ctx,
sync_queue,
&key,
unknown_seen_blobstores,
multiplex_id,
operation_key,
blob_size,
)
.await?;
}
Ok(HealStats {
queue_del: num_entries,
queue_add: num_unknown_entries,
put_success: 0,
put_failure: 0,
})
}
.left_future(),
);
}
let heal_future = async move {
let fetch_data = fetch_blob(ctx, &blobstores, &key, &seen_blobstores).await?;
let FetchData {
blob,
good_sources,
missing_sources,
} = fetch_data;
debug!(
ctx.logger(),
"Fetched blob size for {} is: {}",
key,
blob.len()
);
if !missing_sources.is_empty() {
warn!(
ctx.logger(),
"Source Blobstores {:?} of {:?} returned None even though they \
should contain data",
missing_sources,
seen_blobstores
);
for bid in missing_sources.clone() {
stores_to_heal.insert(bid);
}
}
// If any puts fail make sure we put a good source blobstore_id for that blob
// back on the queue
let heal_results = {
let key = &key;
let mut results = vec![];
for bid in stores_to_heal {
let blobstore = blobstores
.get(&bid)
.expect("stores_to_heal contains unknown blobstore?");
let result = blobstore.put(ctx, key.clone(), blob.clone()).await;
results.push((bid, result.is_ok()));
}
results
};
let (mut healed_stores, mut unhealed_stores): (HashSet<_>, Vec<_>) =
heal_results.into_iter().partition_map(|(id, put_ok)| {
if put_ok {
Either::Left(id)
} else {
Either::Right(id)
}
});
if !unhealed_stores.is_empty() || !unknown_seen_blobstores.is_empty() {
// Add good_sources to the healed_stores as we should write all
// known good blobstores so that the stores_to_heal logic run on read
// has the full data for the blobstore_key
//
// This also ensures we requeue at least one good source store in the case
// where all heal puts fail
for b in good_sources {
healed_stores.insert(b);
}
let heal_stats = HealStats {
queue_del: num_entries,
queue_add: healed_stores.len() + num_unknown_entries,
put_success: healed_stores.len(),
put_failure: unhealed_stores.len(),
};
// Add unknown stores to queue as well so we try them later
for b in unknown_seen_blobstores {
healed_stores.insert(b);
}
unhealed_stores.sort();
warn!(
ctx.logger(),
"Adding source blobstores {:?} to the queue so that failed \
destination blob stores {:?} will be retried later for {:?}",
healed_stores.iter().sorted().collect::<Vec<_>>(),
unhealed_stores,
key,
);
requeue_partial_heal(
ctx,
sync_queue,
&key,
healed_stores,
multiplex_id,
operation_key,
blob_size,
)
.await?;
Ok(heal_stats)
} else {
let heal_stats = HealStats {
queue_del: num_entries,
queue_add: num_unknown_entries,
put_success: healed_stores.len(),
put_failure: unhealed_stores.len(),
};
Ok(heal_stats)
}
};
Some(heal_future.right_future())
}