fn make_blobstore_put_ops()

in eden/mononoke/blobstore/factory/src/blobstore.rs [436:646]


fn make_blobstore_put_ops<'a>(
    fb: FacebookInit,
    blobconfig: BlobConfig,
    mysql_options: &'a MysqlOptions,
    readonly_storage: ReadOnlyStorage,
    blobstore_options: &'a BlobstoreOptions,
    logger: &'a Logger,
    config_store: &'a ConfigStore,
    scrub_handler: &'a Arc<dyn ScrubHandler>,
    component_sampler: Option<&'a Arc<dyn ComponentSamplingHandler>>,
    blobstore_id: Option<BlobstoreId>,
) -> BoxFuture<'a, Result<Arc<dyn BlobstorePutOps>, Error>> {
    // NOTE: This needs to return a BoxFuture because it recurses.
    async move {
        use BlobConfig::*;

        let mut needs_wrappers = true;
        let store = match blobconfig {
            // Physical blobstores
            Sqlite { .. } | Mysql { .. } => make_sql_blobstore(
                fb,
                blobconfig,
                readonly_storage,
                blobstore_options,
                config_store,
            )
            .watched(logger)
            .await
            .map(|store| Arc::new(store) as Arc<dyn BlobstorePutOps>)?,
            Manifold { .. } | ManifoldWithTtl { .. } => {
                make_manifold_blobstore(fb, blobconfig, blobstore_options)
                    .watched(logger)
                    .await
                    .map(|store| Arc::new(store) as Arc<dyn BlobstorePutOps>)?
            }
            Files { .. } => make_files_blobstore(blobconfig, blobstore_options)
                .await
                .map(|store| Arc::new(store) as Arc<dyn BlobstorePutOps>)?,
            S3 {
                bucket,
                keychain_group,
                region_name,
                endpoint,
                num_concurrent_operations,
            } => {
                #[cfg(fbcode_build)]
                {
                    ::s3blob::S3Blob::new(
                        fb,
                        bucket,
                        keychain_group,
                        region_name,
                        endpoint,
                        blobstore_options.put_behaviour,
                        logger,
                        num_concurrent_operations,
                    )
                    .watched(logger)
                    .await
                    .context(ErrorKind::StateOpen)
                    .map(|store| Arc::new(store) as Arc<dyn BlobstorePutOps>)?
                }
                #[cfg(not(fbcode_build))]
                {
                    let _ = (
                        bucket,
                        keychain_group,
                        region_name,
                        endpoint,
                        num_concurrent_operations,
                    );
                    unimplemented!("This is implemented only for fbcode_build")
                }
            }

            // Special case
            Disabled => {
                Arc::new(DisabledBlob::new("Disabled by configuration")) as Arc<dyn BlobstorePutOps>
            }

            // Wrapper blobstores
            Multiplexed {
                multiplex_id,
                scuba_table,
                multiplex_scuba_table,
                scuba_sample_rate,
                blobstores,
                minimum_successful_writes,
                not_present_read_quorum,
                queue_db,
            } => {
                needs_wrappers = false;
                make_blobstore_multiplexed(
                    fb,
                    multiplex_id,
                    queue_db,
                    scuba_table,
                    multiplex_scuba_table,
                    scuba_sample_rate,
                    blobstores,
                    minimum_successful_writes,
                    not_present_read_quorum,
                    mysql_options,
                    readonly_storage,
                    blobstore_options,
                    logger,
                    config_store,
                    scrub_handler,
                    component_sampler,
                )
                .watched(logger)
                .await?
            }
            Logging {
                blobconfig,
                scuba_table,
                scuba_sample_rate,
            } => {
                needs_wrappers = false;
                let store = make_blobstore_put_ops(
                    fb,
                    *blobconfig,
                    mysql_options,
                    readonly_storage,
                    &blobstore_options,
                    logger,
                    config_store,
                    scrub_handler,
                    component_sampler,
                    None,
                )
                .watched(logger)
                .await?;

                let scuba = scuba_table
                    .map_or(MononokeScubaSampleBuilder::with_discard(), |table| {
                        MononokeScubaSampleBuilder::new(fb, &table)
                    });
                Arc::new(LogBlob::new(store, scuba, scuba_sample_rate)) as Arc<dyn BlobstorePutOps>
            }
            Pack { .. } => {
                // NB packblob does not apply the wrappers internally
                make_packblob(
                    fb,
                    blobconfig,
                    readonly_storage,
                    blobstore_options,
                    logger,
                    config_store,
                )
                .watched(logger)
                .await
                .map(|store| Arc::new(store) as Arc<dyn BlobstorePutOps>)?
            }
        };

        let store = if needs_wrappers {
            let store = if let Some(component_sampler) = component_sampler {
                Arc::new(SamplingBlobstorePutOps::new(
                    store,
                    blobstore_id,
                    component_sampler.clone(),
                )) as Arc<dyn BlobstorePutOps>
            } else {
                store
            };

            let store = if readonly_storage.0 {
                Arc::new(ReadOnlyBlobstore::new(store)) as Arc<dyn BlobstorePutOps>
            } else {
                store
            };

            let store = if blobstore_options.throttle_options.has_throttle() {
                Arc::new(
                    ThrottledBlob::new(store, blobstore_options.throttle_options)
                        .watched(logger)
                        .await,
                ) as Arc<dyn BlobstorePutOps>
            } else {
                store
            };

            let store = if blobstore_options.chaos_options.has_chaos() {
                Arc::new(ChaosBlobstore::new(store, blobstore_options.chaos_options))
                    as Arc<dyn BlobstorePutOps>
            } else {
                store
            };

            if blobstore_options.delay_options.has_delay() {
                Arc::new(DelayedBlobstore::from_options(
                    store,
                    blobstore_options.delay_options,
                )) as Arc<dyn BlobstorePutOps>
            } else {
                store
            }
        } else {
            // Already applied the wrappers inside the store
            store
        };

        // NOTE: Do not add wrappers here that should only be added once per repository, since this
        // function will get called recursively for each member of a Multiplex! For those, use
        // RepoBlobstore::new instead.

        Ok(store)
    }
    .boxed()
}