in core/src/services/s3/backend.rs [719:1023]
fn build(mut self) -> Result<impl Access> {
debug!("backend build started: {:?}", &self);
let root = normalize_root(&self.config.root.clone().unwrap_or_default());
debug!("backend use root {}", &root);
// Handle bucket name.
let bucket = if self.is_bucket_valid() {
Ok(&self.config.bucket)
} else {
Err(
Error::new(ErrorKind::ConfigInvalid, "The bucket is misconfigured")
.with_context("service", Scheme::S3),
)
}?;
debug!("backend use bucket {}", &bucket);
let default_storage_class = match &self.config.default_storage_class {
None => None,
Some(v) => Some(
build_header_value(v).map_err(|err| err.with_context("key", "storage_class"))?,
),
};
let server_side_encryption = match &self.config.server_side_encryption {
None => None,
Some(v) => Some(
build_header_value(v)
.map_err(|err| err.with_context("key", "server_side_encryption"))?,
),
};
let server_side_encryption_aws_kms_key_id =
match &self.config.server_side_encryption_aws_kms_key_id {
None => None,
Some(v) => Some(build_header_value(v).map_err(|err| {
err.with_context("key", "server_side_encryption_aws_kms_key_id")
})?),
};
let server_side_encryption_customer_algorithm =
match &self.config.server_side_encryption_customer_algorithm {
None => None,
Some(v) => Some(build_header_value(v).map_err(|err| {
err.with_context("key", "server_side_encryption_customer_algorithm")
})?),
};
let server_side_encryption_customer_key =
match &self.config.server_side_encryption_customer_key {
None => None,
Some(v) => Some(build_header_value(v).map_err(|err| {
err.with_context("key", "server_side_encryption_customer_key")
})?),
};
let server_side_encryption_customer_key_md5 =
match &self.config.server_side_encryption_customer_key_md5 {
None => None,
Some(v) => Some(build_header_value(v).map_err(|err| {
err.with_context("key", "server_side_encryption_customer_key_md5")
})?),
};
let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
None => None,
v => {
return Err(Error::new(
ErrorKind::ConfigInvalid,
format!("{:?} is not a supported checksum_algorithm.", v),
))
}
};
// This is our current config.
let mut cfg = AwsConfig::default();
if !self.config.disable_config_load {
#[cfg(not(target_arch = "wasm32"))]
{
cfg = cfg.from_profile();
cfg = cfg.from_env();
}
}
if let Some(ref v) = self.config.region {
cfg.region = Some(v.to_string());
}
if cfg.region.is_none() {
return Err(Error::new(
ErrorKind::ConfigInvalid,
"region is missing. Please find it by S3::detect_region() or set them in env.",
)
.with_operation("Builder::build")
.with_context("service", Scheme::S3));
}
let region = cfg.region.to_owned().unwrap();
debug!("backend use region: {region}");
// Retain the user's endpoint if it exists; otherwise, try loading it from the environment.
self.config.endpoint = self.config.endpoint.or_else(|| cfg.endpoint_url.clone());
// Building endpoint.
let endpoint = self.build_endpoint(®ion);
debug!("backend use endpoint: {endpoint}");
// Setting all value from user input if available.
if let Some(v) = self.config.access_key_id {
cfg.access_key_id = Some(v)
}
if let Some(v) = self.config.secret_access_key {
cfg.secret_access_key = Some(v)
}
if let Some(v) = self.config.session_token {
cfg.session_token = Some(v)
}
let mut loader: Option<Box<dyn AwsCredentialLoad>> = None;
// If customized_credential_load is set, we will use it.
if let Some(v) = self.customized_credential_load {
loader = Some(v);
}
// If role_arn is set, we must use AssumeRoleLoad.
if let Some(role_arn) = self.config.role_arn {
// use current env as source credential loader.
let default_loader =
AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone());
// Build the config for assume role.
let mut assume_role_cfg = AwsConfig {
region: Some(region.clone()),
role_arn: Some(role_arn),
external_id: self.config.external_id.clone(),
sts_regional_endpoints: "regional".to_string(),
..Default::default()
};
// override default role_session_name if set
if let Some(name) = self.config.role_session_name {
assume_role_cfg.role_session_name = name;
}
let assume_role_loader = AwsAssumeRoleLoader::new(
GLOBAL_REQWEST_CLIENT.clone().clone(),
assume_role_cfg,
Box::new(default_loader),
)
.map_err(|err| {
Error::new(
ErrorKind::ConfigInvalid,
"The assume_role_loader is misconfigured",
)
.with_context("service", Scheme::S3)
.set_source(err)
})?;
loader = Some(Box::new(assume_role_loader));
}
// If loader is not set, we will use default loader.
let loader = match loader {
Some(v) => v,
None => {
let mut default_loader =
AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
if self.config.disable_ec2_metadata {
default_loader = default_loader.with_disable_ec2_metadata();
}
Box::new(default_loader)
}
};
let signer = AwsV4Signer::new("s3", ®ion);
let delete_max_size = self
.config
.delete_max_size
.unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
Ok(S3Backend {
core: Arc::new(S3Core {
info: {
let am = AccessorInfo::default();
am.set_scheme(Scheme::S3)
.set_root(&root)
.set_name(bucket)
.set_native_capability(Capability {
stat: true,
stat_has_content_encoding: true,
stat_with_if_match: true,
stat_with_if_none_match: true,
stat_with_if_modified_since: true,
stat_with_if_unmodified_since: true,
stat_with_override_cache_control: !self
.config
.disable_stat_with_override,
stat_with_override_content_disposition: !self
.config
.disable_stat_with_override,
stat_with_override_content_type: !self
.config
.disable_stat_with_override,
stat_with_version: self.config.enable_versioning,
stat_has_cache_control: true,
stat_has_content_length: true,
stat_has_content_type: true,
stat_has_content_range: true,
stat_has_etag: true,
stat_has_content_md5: true,
stat_has_last_modified: true,
stat_has_content_disposition: true,
stat_has_user_metadata: true,
stat_has_version: true,
read: true,
read_with_if_match: true,
read_with_if_none_match: true,
read_with_if_modified_since: true,
read_with_if_unmodified_since: true,
read_with_override_cache_control: true,
read_with_override_content_disposition: true,
read_with_override_content_type: true,
read_with_version: self.config.enable_versioning,
write: true,
write_can_empty: true,
write_can_multi: true,
write_can_append: self.config.enable_write_with_append,
write_with_cache_control: true,
write_with_content_type: true,
write_with_content_encoding: true,
write_with_if_match: !self.config.disable_write_with_if_match,
write_with_if_not_exists: true,
write_with_user_metadata: true,
// The min multipart size of S3 is 5 MiB.
//
// ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
write_multi_min_size: Some(5 * 1024 * 1024),
// The max multipart size of S3 is 5 GiB.
//
// ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
write_multi_max_size: if cfg!(target_pointer_width = "64") {
Some(5 * 1024 * 1024 * 1024)
} else {
Some(usize::MAX)
},
delete: true,
delete_max_size: Some(delete_max_size),
delete_with_version: self.config.enable_versioning,
copy: true,
list: true,
list_with_limit: true,
list_with_start_after: true,
list_with_recursive: true,
list_with_versions: self.config.enable_versioning,
list_with_deleted: self.config.enable_versioning,
list_has_etag: true,
list_has_content_md5: true,
list_has_content_length: true,
list_has_last_modified: true,
presign: true,
presign_stat: true,
presign_read: true,
presign_write: true,
shared: true,
..Default::default()
});
// allow deprecated api here for compatibility
#[allow(deprecated)]
if let Some(client) = self.http_client {
am.update_http_client(|_| client);
}
am.into()
},
bucket: bucket.to_string(),
endpoint,
root,
server_side_encryption,
server_side_encryption_aws_kms_key_id,
server_side_encryption_customer_algorithm,
server_side_encryption_customer_key,
server_side_encryption_customer_key_md5,
default_storage_class,
allow_anonymous: self.config.allow_anonymous,
disable_list_objects_v2: self.config.disable_list_objects_v2,
enable_request_payer: self.config.enable_request_payer,
signer,
loader,
credential_loaded: AtomicBool::new(false),
checksum_algorithm,
}),
})
}