in cli/src/klio_cli/commands/job/verify.py [0:0]
def _verify_gcs_bucket(self, bucket):
logging.info("Verifying {}".format(bucket))
gcs_loc = urlparse(bucket)
if not gcs_loc.scheme == "gs":
logging.error(
"Unsupported location type, skipping: {}".format(bucket)
)
return False
try:
if self.create_resources:
self.storage_client.create_bucket(gcs_loc.netloc)
logging.info("Creating your bucket {}...".format(bucket))
logging.info("Successfully created your bucket.")
else:
self.storage_client.get_bucket(gcs_loc.netloc)
logging.info("Bucket {} exists".format(bucket))
return True
except api_ex.Conflict:
logging.info("Bucket {} exists".format(bucket))
return True
except exceptions.NotFound:
logging.error("The bucket {} was not found.".format(bucket))
return False
except Exception as e:
logging.error("Unable to verify {} due to: {}".format(bucket, e))
return False