in cdk-project/lib/images/codebuild-image/python/src/notebooks/utils.py [0:0]
def _create_s3_bucket_if_it_does_not_exist(bucket_name, region, session):
"""Creates an S3 Bucket if it does not exist.
Also swallows a few common exceptions that indicate that the bucket already exists or
that it is being created.
Args:
bucket_name (str): Name of the S3 bucket to be created.
region (str): The region in which to create the bucket.
Raises:
botocore.exceptions.ClientError: If S3 throws an unexpected exception during bucket
creation.
If the exception is due to the bucket already existing or
already being created, no exception is raised.
"""
bucket = session.resource("s3", region_name=region).Bucket(name=bucket_name)
if bucket.creation_date is None:
try:
s3 = session.resource("s3", region_name=region)
if region == "us-east-1":
# 'us-east-1' cannot be specified because it is the default region:
# https://github.com/boto/boto3/issues/125
s3.create_bucket(Bucket=bucket_name)
else:
s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": region},
)
print("Created S3 bucket: %s", bucket_name)
except botocore.exceptions.ClientError as e:
error_code = e.response["Error"]["Code"]
message = e.response["Error"]["Message"]
if error_code == "BucketAlreadyOwnedByYou":
pass
elif (
error_code == "OperationAborted" and "conflicting conditional operation" in message
):
# If this bucket is already being concurrently created, we don't need to create
# it again.
pass
else:
raise