fn heal_blob()

in eden/mononoke/cmds/blobstore_healer/healer.rs [294:480]


fn heal_blob<'out>(
    ctx: &'out CoreContext,
    sync_queue: &'out dyn BlobstoreSyncQueue,
    blobstores: &'out HashMap<BlobstoreId, Arc<dyn Blobstore>>,
    healing_deadline: DateTime,
    key: String,
    multiplex_id: MultiplexId,
    entries: &[BlobstoreSyncQueueEntry],
) -> Option<impl Future<Output = Result<HealStats>> + 'out> {
    if entries.is_empty() {
        return None;
    }
    // This is needed as we load by key, and a given key may have entries both before and after
    // the deadline.  We leave the key rather than re-add to avoid entries always being too new.
    if !entries.iter().all(|e| e.timestamp < healing_deadline) {
        return None;
    }

    let num_entries: usize = entries.len();

    let operation_key = entries[0].operation_key.clone();
    let blob_size = entries[0].blob_size;

    let (seen_blobstores, unknown_seen_blobstores): (HashSet<_>, HashSet<_>) =
        entries.iter().partition_map(|entry| {
            let id = entry.blobstore_id;
            if blobstores.contains_key(&id) {
                Either::Left(id)
            } else {
                Either::Right(id)
            }
        });

    let num_unknown_entries: usize = unknown_seen_blobstores.len();

    if !unknown_seen_blobstores.is_empty() {
        warn!(
            ctx.logger(),
            "Ignoring unknown blobstores {:?} for key {}", unknown_seen_blobstores, key
        );
    }

    let mut stores_to_heal: HashSet<BlobstoreId> = blobstores
        .iter()
        .filter_map(|(key, _)| {
            if seen_blobstores.contains(key) {
                None
            } else {
                Some(key.clone())
            }
        })
        .collect();

    if stores_to_heal.is_empty() || seen_blobstores.is_empty() {
        // All blobstores have been synchronized or all are unknown to be requeued
        return Some(
            async move {
                if !unknown_seen_blobstores.is_empty() {
                    requeue_partial_heal(
                        ctx,
                        sync_queue,
                        &key,
                        unknown_seen_blobstores,
                        multiplex_id,
                        operation_key,
                        blob_size,
                    )
                    .await?;
                }
                Ok(HealStats {
                    queue_del: num_entries,
                    queue_add: num_unknown_entries,
                    put_success: 0,
                    put_failure: 0,
                })
            }
            .left_future(),
        );
    }

    let heal_future = async move {
        let fetch_data = fetch_blob(ctx, &blobstores, &key, &seen_blobstores).await?;

        let FetchData {
            blob,
            good_sources,
            missing_sources,
        } = fetch_data;

        debug!(
            ctx.logger(),
            "Fetched blob size for {} is: {}",
            key,
            blob.len()
        );

        if !missing_sources.is_empty() {
            warn!(
                ctx.logger(),
                "Source Blobstores {:?} of {:?} returned None even though they \
                 should contain data",
                missing_sources,
                seen_blobstores
            );
            for bid in missing_sources.clone() {
                stores_to_heal.insert(bid);
            }
        }

        // If any puts fail make sure we put a good source blobstore_id for that blob
        // back on the queue
        let heal_results = {
            let key = &key;
            let mut results = vec![];
            for bid in stores_to_heal {
                let blobstore = blobstores
                    .get(&bid)
                    .expect("stores_to_heal contains unknown blobstore?");
                let result = blobstore.put(ctx, key.clone(), blob.clone()).await;
                results.push((bid, result.is_ok()));
            }
            results
        };
        let (mut healed_stores, mut unhealed_stores): (HashSet<_>, Vec<_>) =
            heal_results.into_iter().partition_map(|(id, put_ok)| {
                if put_ok {
                    Either::Left(id)
                } else {
                    Either::Right(id)
                }
            });

        if !unhealed_stores.is_empty() || !unknown_seen_blobstores.is_empty() {
            // Add good_sources to the healed_stores as we should write all
            // known good blobstores so that the stores_to_heal logic run on read
            // has the full data for the blobstore_key
            //
            // This also ensures we requeue at least one good source store in the case
            // where all heal puts fail
            for b in good_sources {
                healed_stores.insert(b);
            }

            let heal_stats = HealStats {
                queue_del: num_entries,
                queue_add: healed_stores.len() + num_unknown_entries,
                put_success: healed_stores.len(),
                put_failure: unhealed_stores.len(),
            };

            // Add unknown stores to queue as well so we try them later
            for b in unknown_seen_blobstores {
                healed_stores.insert(b);
            }
            unhealed_stores.sort();
            warn!(
                ctx.logger(),
                "Adding source blobstores {:?} to the queue so that failed \
                     destination blob stores {:?} will be retried later for {:?}",
                healed_stores.iter().sorted().collect::<Vec<_>>(),
                unhealed_stores,
                key,
            );
            requeue_partial_heal(
                ctx,
                sync_queue,
                &key,
                healed_stores,
                multiplex_id,
                operation_key,
                blob_size,
            )
            .await?;
            Ok(heal_stats)
        } else {
            let heal_stats = HealStats {
                queue_del: num_entries,
                queue_add: num_unknown_entries,
                put_success: healed_stores.len(),
                put_failure: unhealed_stores.len(),
            };
            Ok(heal_stats)
        }
    };

    Some(heal_future.right_future())
}