in taskcat/_dataclasses.py [0:0]
def create(self):
if self._bucket_matches_existing():
return
kwargs = {"Bucket": self.name}
if self.region != "us-east-1":
kwargs["CreateBucketConfiguration"] = {"LocationConstraint": self.region}
self.s3_client.create_bucket(**kwargs)
try:
self.s3_client.get_waiter("bucket_exists").wait(Bucket=self.name)
if not self.regional_buckets:
self.s3_client.put_bucket_tagging(
Bucket=self.name,
Tagging={
"TagSet": [{"Key": "taskcat-id", "Value": self.taskcat_id.hex}]
},
)
if self.sigv4 is True:
self.s3_client.put_bucket_policy(
Bucket=self.name, Policy=self.sigv4_policy
)
if self.org_id is not None:
self.s3_client.put_bucket_policy(
Bucket=self.name, Policy=self.multi_account_policy
)
except Exception as e: # pylint: disable=broad-except
try:
self.s3_client.delete_bucket(Bucket=self.name)
except Exception as inner_e: # pylint: disable=broad-except
LOG.warning(f"failed to remove bucket {self.name}: {inner_e}")
raise e