in eden/mononoke/blobstore/factory/src/blobstore.rs [436:646]
fn make_blobstore_put_ops<'a>(
fb: FacebookInit,
blobconfig: BlobConfig,
mysql_options: &'a MysqlOptions,
readonly_storage: ReadOnlyStorage,
blobstore_options: &'a BlobstoreOptions,
logger: &'a Logger,
config_store: &'a ConfigStore,
scrub_handler: &'a Arc<dyn ScrubHandler>,
component_sampler: Option<&'a Arc<dyn ComponentSamplingHandler>>,
blobstore_id: Option<BlobstoreId>,
) -> BoxFuture<'a, Result<Arc<dyn BlobstorePutOps>, Error>> {
// NOTE: This needs to return a BoxFuture because it recurses.
async move {
use BlobConfig::*;
let mut needs_wrappers = true;
let store = match blobconfig {
// Physical blobstores
Sqlite { .. } | Mysql { .. } => make_sql_blobstore(
fb,
blobconfig,
readonly_storage,
blobstore_options,
config_store,
)
.watched(logger)
.await
.map(|store| Arc::new(store) as Arc<dyn BlobstorePutOps>)?,
Manifold { .. } | ManifoldWithTtl { .. } => {
make_manifold_blobstore(fb, blobconfig, blobstore_options)
.watched(logger)
.await
.map(|store| Arc::new(store) as Arc<dyn BlobstorePutOps>)?
}
Files { .. } => make_files_blobstore(blobconfig, blobstore_options)
.await
.map(|store| Arc::new(store) as Arc<dyn BlobstorePutOps>)?,
S3 {
bucket,
keychain_group,
region_name,
endpoint,
num_concurrent_operations,
} => {
#[cfg(fbcode_build)]
{
::s3blob::S3Blob::new(
fb,
bucket,
keychain_group,
region_name,
endpoint,
blobstore_options.put_behaviour,
logger,
num_concurrent_operations,
)
.watched(logger)
.await
.context(ErrorKind::StateOpen)
.map(|store| Arc::new(store) as Arc<dyn BlobstorePutOps>)?
}
#[cfg(not(fbcode_build))]
{
let _ = (
bucket,
keychain_group,
region_name,
endpoint,
num_concurrent_operations,
);
unimplemented!("This is implemented only for fbcode_build")
}
}
// Special case
Disabled => {
Arc::new(DisabledBlob::new("Disabled by configuration")) as Arc<dyn BlobstorePutOps>
}
// Wrapper blobstores
Multiplexed {
multiplex_id,
scuba_table,
multiplex_scuba_table,
scuba_sample_rate,
blobstores,
minimum_successful_writes,
not_present_read_quorum,
queue_db,
} => {
needs_wrappers = false;
make_blobstore_multiplexed(
fb,
multiplex_id,
queue_db,
scuba_table,
multiplex_scuba_table,
scuba_sample_rate,
blobstores,
minimum_successful_writes,
not_present_read_quorum,
mysql_options,
readonly_storage,
blobstore_options,
logger,
config_store,
scrub_handler,
component_sampler,
)
.watched(logger)
.await?
}
Logging {
blobconfig,
scuba_table,
scuba_sample_rate,
} => {
needs_wrappers = false;
let store = make_blobstore_put_ops(
fb,
*blobconfig,
mysql_options,
readonly_storage,
&blobstore_options,
logger,
config_store,
scrub_handler,
component_sampler,
None,
)
.watched(logger)
.await?;
let scuba = scuba_table
.map_or(MononokeScubaSampleBuilder::with_discard(), |table| {
MononokeScubaSampleBuilder::new(fb, &table)
});
Arc::new(LogBlob::new(store, scuba, scuba_sample_rate)) as Arc<dyn BlobstorePutOps>
}
Pack { .. } => {
// NB packblob does not apply the wrappers internally
make_packblob(
fb,
blobconfig,
readonly_storage,
blobstore_options,
logger,
config_store,
)
.watched(logger)
.await
.map(|store| Arc::new(store) as Arc<dyn BlobstorePutOps>)?
}
};
let store = if needs_wrappers {
let store = if let Some(component_sampler) = component_sampler {
Arc::new(SamplingBlobstorePutOps::new(
store,
blobstore_id,
component_sampler.clone(),
)) as Arc<dyn BlobstorePutOps>
} else {
store
};
let store = if readonly_storage.0 {
Arc::new(ReadOnlyBlobstore::new(store)) as Arc<dyn BlobstorePutOps>
} else {
store
};
let store = if blobstore_options.throttle_options.has_throttle() {
Arc::new(
ThrottledBlob::new(store, blobstore_options.throttle_options)
.watched(logger)
.await,
) as Arc<dyn BlobstorePutOps>
} else {
store
};
let store = if blobstore_options.chaos_options.has_chaos() {
Arc::new(ChaosBlobstore::new(store, blobstore_options.chaos_options))
as Arc<dyn BlobstorePutOps>
} else {
store
};
if blobstore_options.delay_options.has_delay() {
Arc::new(DelayedBlobstore::from_options(
store,
blobstore_options.delay_options,
)) as Arc<dyn BlobstorePutOps>
} else {
store
}
} else {
// Already applied the wrappers inside the store
store
};
// NOTE: Do not add wrappers here that should only be added once per repository, since this
// function will get called recursively for each member of a Multiplex! For those, use
// RepoBlobstore::new instead.
Ok(store)
}
.boxed()
}