tools/gcs-bucket-mover/gcs_bucket_mover/bucket_mover_tester.py (108 lines of code) (raw):
# Copyright 2018 Google LLC. All rights reserved. Licensed under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
#
# Any software provided by Google hereunder is distributed "AS IS", WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, and is not intended for production use.
"""This is a testing method designed to be run from the command line.
It is meant to be run on a test bucket so you can confirm everything works before attempting to
move an actual production bucket. It will also attempt to delete the bucket from both the source
and target projects before it starts so that it can be run repeatedly.
"""
import uuid
from faker import Faker
from yaspin import yaspin
from google.cloud import exceptions
from google.cloud import storage
_CHECKMARK = "\u2713".encode("utf8")
def set_up_test_bucket(config, parsed_args):
"""Sets up the test bucket, adds objects and assigns various settings.
It makes sure none of the buckets already exist, and then runs the main bucket mover service.
Args:
config: A Configuration object with all of the config values needed for the script to run
parsed_args: the configargparser parsing of command line options
Returns:
The name of the randomly generated bucket
"""
random_bucket_name = _get_random_bucket_name()
config.temp_bucket_name = random_bucket_name + "-temp"
with yaspin(text="TESTING: Cleanup source bucket") as spinner:
try:
_check_bucket_exists_and_delete(
spinner,
config.source_storage_client,
random_bucket_name,
config.source_project,
)
except exceptions.Forbidden:
try:
# Maybe the bucket already exists in the target project.
_check_bucket_exists_and_delete(
spinner,
config.target_storage_client,
random_bucket_name,
config.target_project,
)
except exceptions.Forbidden:
spinner.write(f"TESTING: Not allowed to access bucket {random_bucket_name}")
spinner.fail("X")
raise SystemExit()
source_bucket = create_bucket(config.source_storage_client,
random_bucket_name, parsed_args)
spinner.write(f"{_CHECKMARK} TESTING: Bucket {random_bucket_name} created in source project {config.source_project}")
_upload_blobs(source_bucket)
with yaspin(text="TESTING: Cleanup target bucket") as spinner:
_check_bucket_exists_and_delete(
spinner,
config.target_storage_client,
config.temp_bucket_name,
config.target_project,
)
print()
return random_bucket_name
def create_bucket(storage_client, bucket_name, parsed_args):
"""Creates the test bucket.
Also sets up lots of different bucket settings to make sure they can be moved.
Args:
storage_client: The storage client object used to access GCS
bucket_name: The name of the bucket to create
parsed_args: the configargparser parsing of command line options
Returns:
The bucket object that has been created in GCS
"""
bucket = storage.Bucket(client=storage_client, name=bucket_name)
# Requester pays
bucket.requester_pays = False
# CORS
policies = bucket.cors
policies.append({"origin": ["/foo"]})
policies[0]["maxAgeSeconds"] = 3600
bucket.cors = policies
# KMS Key - When a custom KMS key is set up, uncomment the line below to test it
# bucket.default_kms_key_name = parsed_args.test_default_kms_key_name
# Labels
bucket.labels = {"colour": "red", "flavour": "cherry"}
# Object Lifecycle Rules
bucket.lifecycle_rules = [{
"action": {
"type": "Delete"
},
"condition": {
"age": 365
}
}]
# Location
bucket.location = parsed_args.test_bucket_location
# Storage Class
bucket.storage_class = parsed_args.test_storage_class
# File Versioning
# Setting this to True means we can't delete a non-empty bucket with the CLI in one
# bucket.delete command
bucket.versioning_enabled = False
# Access Logs
bucket.enable_logging(parsed_args.test_logging_bucket,
parsed_args.test_logging_prefix)
bucket.create()
# IAM Policies
policy = bucket.get_iam_policy()
# Uncomment the line below to view the existing IAM policies
# print(json.dumps(policy.to_api_repr(), indent=4, sort_keys=True))
policy["roles/storage.admin"].add("user:" + parsed_args.test_email_for_iam)
bucket.set_iam_policy(policy)
# ACLs
bucket.acl.user(parsed_args.test_email_for_iam).grant_read()
bucket.acl.save()
# Default Object ACL
bucket.default_object_acl.user(parsed_args.test_email_for_iam).grant_read()
bucket.default_object_acl.save()
bucket.update()
# Bucket Notification
notification = storage.notification.BucketNotification(
bucket,
parsed_args.test_topic_name,
custom_attributes={"myKey": "myValue"},
event_types=["OBJECT_FINALIZE", "OBJECT_DELETE"],
payload_format="JSON_API_V1",
)
notification.create()
return bucket
def _get_random_bucket_name():
"""Generate a random bucket name for testing purposes"""
return "bucket_mover_" + str(uuid.uuid4())
def _check_bucket_exists_and_delete(spinner, storage_client, bucket_name,
project_name):
"""Checks if the bucket exists and delete it.
If it already exists, prompt the user to make sure they want to delete it and everything in
it.
Args:
spinner: The spinner displayed in the console
storage_client: The storage client object used to access GCS
bucket_name: The name of the bucket to check if it exists
project_name: The name of the project to check the bucket exists in
Raises:
SystemExit: If the bucket already exists and the user does not choose to delete it
"""
bucket = storage.Bucket(client=storage_client,
name=bucket_name,
user_project=project_name)
if bucket.exists():
spinner.hide()
answer = input(
f"\nWARNING!!! Bucket {bucket_name} already exists in project {project_name}\nType YES to confirm you want to"
" delete it: ")
spinner.show()
if answer != "YES":
spinner.fail("X")
raise SystemExit()
spinner.write("")
bucket.delete(force=True)
spinner.write(f"{_CHECKMARK} TESTING: Bucket {bucket_name} deleted from project {project_name}")
def _upload_blobs(bucket):
"""Uploads some random text files to the bucket.
Args:
bucket: The bucket object to upload blobs to
"""
with yaspin(text="TESTING: Uploading 5 random txt files") as spinner:
fake = Faker()
for number in range(5):
blob = bucket.blob(fake.file_name(extension="txt")) # pylint: disable=no-member
blob.metadata = {"customKey": "myFakeValue" + str(number)}
blob.upload_from_string(fake.text()) # Generator is dynamic. pylint: disable=no-member
spinner.ok(_CHECKMARK)