tools/gcs-bucket-mover/gcs_bucket_mover/bucket_mover_service.py (709 lines of code) (raw):
# Copyright 2018 Google LLC. All rights reserved. Licensed under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
#
# Any software provided by Google hereunder is distributed "AS IS", WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, and is not intended for production use.
"""Script to move a bucket, all settings and data from one project to another."""
import datetime
import json
from time import sleep
from retrying import retry
from yaspin import yaspin
from google.api_core import iam as api_core_iam
from google.cloud import exceptions
from google.cloud import pubsub
from google.cloud import storage
from google.cloud.storage import iam
from googleapiclient import discovery
from gcs_bucket_mover import bucket_details
from gcs_bucket_mover import sts_job_status
_CHECKMARK = "\u2713".encode("utf8")
def main(config, parsed_args, cloud_logger):
"""Main entry point for the bucket mover tool
Args:
config: A Configuration object with all of the config values needed for the script to run
parsed_args: the configargparser parsing of command line options
cloud_logger: A GCP logging client instance
"""
cloud_logger.log_text("Starting GCS Bucket Mover")
_print_config_details(cloud_logger, config)
source_bucket = (
config.source_storage_client.lookup_bucket( # pylint: disable=no-member
config.bucket_name))
if source_bucket is None:
msg = "The source bucket does not exist, so we cannot continue"
cloud_logger.log_text(msg)
raise SystemExit(msg)
# Get copies of all of the source bucket's IAM, ACLs and settings so they
# can be copied over to the target project bucket; details are retrievable
# only if the corresponding feature is enabled in the configuration
source_bucket_details = bucket_details.BucketDetails(
conf=parsed_args, source_bucket=source_bucket)
transfer_log_value = _check_log_values(cloud_logger, config)
_check_bucket_lock(cloud_logger, config, source_bucket,
source_bucket_details)
sts_client = discovery.build("storagetransfer",
"v1",
credentials=config.target_project_credentials)
if config.is_rename:
_rename_bucket(
cloud_logger,
config,
source_bucket,
source_bucket_details,
sts_client,
transfer_log_value,
)
else:
_move_bucket(
cloud_logger,
config,
source_bucket,
source_bucket_details,
sts_client,
transfer_log_value,
)
cloud_logger.log_text("Completed GCS Bucket Mover")
def _check_log_values(cloud_logger, config):
log_action_list = ["COPY", "DELETE", "FIND"]
log_states_list = ["SUCCEEDED", "FAILED"]
log_action_final = []
log_states_final = []
if config.log_action and not (config.log_action_state):
log_action = config.log_action.split(",")
for ele in log_action:
if ele in log_action_list:
log_action_final.append(ele)
else:
msg = "Entered log action is incorrect"
cloud_logger.log_text(msg)
with yaspin(text=msg) as spinner:
spinner.ok(_CHECKMARK)
raise Exception(msg)
transfer_log_value = {"logActions": log_action_final}
elif not (config.log_action) and (config.log_action_state):
log_states = config.log_action_state.split(",")
for ele in log_states:
if ele in log_states_list:
log_states_final.append(ele)
else:
msg = "Entered log states is incorrect"
cloud_logger.log_text(msg)
with yaspin(text=msg) as spinner:
spinner.ok(_CHECKMARK)
raise Exception(msg)
transfer_log_value = {"logActionStates": log_states_final}
elif (config.log_action) and (config.log_action_state):
log_action = config.log_action.split(",")
for ele in log_action:
if ele in log_action_list:
log_action_final.append(ele)
else:
msg = "Entered log action or log state is incorrect"
cloud_logger.log_text(msg)
with yaspin(text=msg) as spinner:
spinner.ok(_CHECKMARK)
raise Exception(msg)
log_states = config.log_action_state.split(",")
for ele in log_states:
if ele in log_states_list:
log_states_final.append(ele)
else:
msg = "Entered log states is incorrect"
cloud_logger.log_text(msg)
with yaspin(text=msg) as spinner:
spinner.ok(_CHECKMARK)
raise Exception(msg)
transfer_log_value = {
"logActions": log_action_final,
"logActionStates": log_states_final,
}
else:
transfer_log_value = None
return transfer_log_value
def _rename_bucket(
cloud_logger,
config,
source_bucket,
source_bucket_details,
sts_client,
transfer_log_value,
):
"""Main method for doing a bucket rename
This can also involve a move across projects.
Args:
cloud_logger: A GCP logging client instance
config: A Configuration object with all of the config values needed for the script to run
source_bucket: The bucket object for the original source bucket in the source project
source_bucket_details: The details copied from the source bucket that is being moved
sts_client: The STS client object to be used
"""
target_bucket = _create_target_bucket(cloud_logger, config,
source_bucket_details,
config.target_bucket_name)
sts_account_email = _assign_sts_permissions(cloud_logger, sts_client,
config, target_bucket)
_run_and_wait_for_sts_job(
sts_client,
config.target_project,
config.bucket_name,
config.target_bucket_name,
cloud_logger,
config,
transfer_log_value,
)
_delete_empty_source_bucket(cloud_logger, source_bucket)
_remove_sts_permissions(cloud_logger, sts_account_email, config,
config.target_bucket_name)
def _move_bucket(
cloud_logger,
config,
source_bucket,
source_bucket_details,
sts_client,
transfer_log_value,
):
"""Main method for doing a bucket move.
This flow does not include a rename, the target bucket will have the same
name as the source bucket.
Args:
cloud_logger: A GCP logging client instance
config: A Configuration object with all of the config values needed for the script to run
source_bucket: The bucket object for the original source bucket in the source project
source_bucket_details: The details copied from the source bucket that is being moved
sts_client: The STS client object to be used
"""
target_temp_bucket = _create_target_bucket(cloud_logger, config,
source_bucket_details,
config.temp_bucket_name)
sts_account_email = _assign_sts_permissions(cloud_logger, sts_client,
config, target_temp_bucket)
_run_and_wait_for_sts_job(
sts_client,
config.target_project,
config.bucket_name,
config.temp_bucket_name,
cloud_logger,
config,
transfer_log_value,
)
_delete_empty_source_bucket(cloud_logger, source_bucket)
_recreate_source_bucket(cloud_logger, config, source_bucket_details)
_assign_sts_permissions_to_new_bucket(cloud_logger, sts_account_email,
config)
_run_and_wait_for_sts_job(
sts_client,
config.target_project,
config.temp_bucket_name,
config.bucket_name,
cloud_logger,
config,
transfer_log_value,
)
_delete_empty_temp_bucket(cloud_logger, target_temp_bucket)
_remove_sts_permissions(cloud_logger, sts_account_email, config,
config.bucket_name)
def _print_config_details(cloud_logger, config):
"""Print out the pertinent project/bucket details
Args:
cloud_logger: A GCP logging client instance
config: A Configuration object with all of the config values needed for the script to run
"""
_print_and_log(cloud_logger,
"Source Project: {}".format(config.source_project))
_print_and_log(cloud_logger, "Source Bucket: {}".format(config.bucket_name))
_print_and_log(
cloud_logger,
"Source Service Account: {}".format(
config.source_project_credentials.service_account_email),
) # pylint: disable=no-member
_print_and_log(cloud_logger,
"Target Project: {}".format(config.target_project))
_print_and_log(cloud_logger,
"Target Bucket: {}".format(config.target_bucket_name))
_print_and_log(
cloud_logger,
"Target Service Account: {}".format(
config.target_project_credentials.service_account_email),
) # pylint: disable=no-member
def _check_bucket_lock(cloud_logger, config, bucket, source_bucket_details):
"""Confirm there is no lock and we can continue with the move
Args:
cloud_logger: A GCP logging client instance
config: A Configuration object with all of the config values needed for the script to run
bucket: The bucket object to lock down
source_bucket_details: The details copied from the source bucket that is being moved
"""
if not config.disable_bucket_lock:
spinner_text = "Confirming that lock file {} does not exist".format(
config.lock_file_name)
cloud_logger.log_text(spinner_text)
with yaspin(text=spinner_text) as spinner:
_write_spinner_and_log(
spinner,
cloud_logger,
"Logging source bucket IAM and ACLs to Stackdriver",
)
cloud_logger.log_text(
json.dumps(source_bucket_details.iam_policy.to_api_repr()))
if source_bucket_details.acl_entities:
for entity in source_bucket_details.acl_entities:
cloud_logger.log_text(str(entity))
_lock_down_bucket(
spinner,
cloud_logger,
bucket,
config.lock_file_name,
config.source_project_credentials.service_account_email,
) # pylint: disable=no-member
spinner.ok(_CHECKMARK)
def _lock_down_bucket(spinner, cloud_logger, bucket, lock_file_name,
service_account_email):
"""Change the ACL/IAM on the bucket so that only the service account can access it.
Args:
spinner: The spinner displayed in the console
cloud_logger: A GCP logging client instance
bucket: The bucket object to lock down
lock_file_name: The name of the lock file
service_account_email: The email of the service account
"""
if storage.Blob(lock_file_name, bucket).exists():
spinner.fail("X")
msg = "The lock file exists in the source bucket, so we cannot continue"
cloud_logger.log_text(msg)
raise SystemExit(msg)
spinner.ok(_CHECKMARK)
msg = "Locking down the bucket by revoking all ACLs/IAM policies"
spinner.text = msg
cloud_logger.log_text(msg)
is_uniform_bucket = vars(bucket)["_properties"]["iamConfiguration"][
"uniformBucketLevelAccess"]["enabled"]
if not is_uniform_bucket:
# Turn off any bucket ACLs
bucket.acl.save_predefined("private")
# Revoke all IAM access and only set the service account as an admin
policy = api_core_iam.Policy()
policy["roles/storage.admin"].add("serviceAccount:" + service_account_email)
bucket.set_iam_policy(policy)
def _create_target_bucket(cloud_logger, config, source_bucket_details,
bucket_name):
"""Creates either the temp bucket or target bucket (during rename) in the target project
Args:
cloud_logger: A GCP logging client instance
config: A Configuration object with all of the config values needed for the script to run
source_bucket_details: The details copied from the source bucket that is being moved
bucket_name: The name of the bucket to create
Returns:
The bucket object that has been created in GCS
"""
if config.is_rename:
spinner_text = "Creating target bucket {} in project {}".format(
bucket_name, config.target_project)
else:
spinner_text = "Creating temp target bucket {} in project {}".format(
bucket_name, config.target_project)
cloud_logger.log_text(spinner_text)
with yaspin(text=spinner_text) as spinner:
target_bucket = _create_bucket(spinner, cloud_logger, config,
bucket_name, source_bucket_details)
_write_spinner_and_log(
spinner,
cloud_logger,
"Bucket {} created in target project {}".format(
bucket_name, config.target_project),
)
return target_bucket
def _assign_sts_permissions(cloud_logger, sts_client, config,
target_temp_bucket):
"""Assign the required STS permissions to the source/temp bucket
Args:
cloud_logger: A GCP logging client instance
sts_client: The STS client object to be used
config: A Configuration object with all of the config values needed for the script to run
target_temp_bucket: The bucket object for the temp bucket in the target project
Returns:
The email account of the STS account
"""
spinner_text = "Assigning STS permissions to source/temp buckets"
cloud_logger.log_text(spinner_text)
with yaspin(text=spinner_text) as spinner:
sts_account_email = _get_sts_iam_account_email(sts_client,
config.target_project)
_write_spinner_and_log(
spinner,
cloud_logger,
"STS service account for IAM usage: {}".format(sts_account_email),
)
_assign_sts_iam_roles(
sts_account_email,
config.source_storage_client,
config.source_project,
config.bucket_name,
True,
)
_assign_sts_iam_roles(
sts_account_email,
config.target_storage_client,
config.target_project,
target_temp_bucket.name,
True,
)
spinner.ok(_CHECKMARK)
return sts_account_email
def _assign_sts_permissions_to_new_bucket(cloud_logger, sts_account_email,
config):
"""Assign the required STS permissions to the new source bucket in the target project
Args:
cloud_logger: A GCP logging client instance
sts_account_email: The email account of the STS account
config: A Configuration object with all of the config values needed for the script to run
"""
spinner_text = "Assigning STS permissions to new source bucket"
cloud_logger.log_text(spinner_text)
with yaspin(text=spinner_text) as spinner:
_assign_sts_iam_roles(
sts_account_email,
config.target_storage_client,
config.target_project,
config.bucket_name,
False,
)
spinner.ok(_CHECKMARK)
def _delete_empty_source_bucket(cloud_logger, source_bucket):
"""Delete the empty source bucket
Args:
cloud_logger: A GCP logging client instance
source_bucket: The bucket object for the original source bucket in the source project
"""
spinner_text = "Deleting empty source bucket"
cloud_logger.log_text(spinner_text)
with yaspin(text=spinner_text) as spinner:
source_bucket.delete()
spinner.ok(_CHECKMARK)
def _recreate_source_bucket(cloud_logger, config, source_bucket_details):
"""Now that the original source bucket is deleted, re-create it in the target project
Args:
cloud_logger: A GCP logging client instance
config: A Configuration object with all of the config values needed for the script to run
source_bucket_details: The details copied from the source bucket that is being moved
"""
spinner_text = "Re-creating source bucket in target project"
cloud_logger.log_text(spinner_text)
with yaspin(text=spinner_text) as spinner:
_create_bucket(spinner, cloud_logger, config, config.bucket_name,
source_bucket_details)
spinner.ok(_CHECKMARK)
def _delete_empty_temp_bucket(cloud_logger, target_temp_bucket):
"""Now that the temp bucket is empty, delete it
Args:
cloud_logger: A GCP logging client instance
target_temp_bucket: The GCS bucket object of the target temp bucket
"""
spinner_text = "Deleting empty temp bucket"
cloud_logger.log_text(spinner_text)
with yaspin(text=spinner_text) as spinner:
target_temp_bucket.delete()
spinner.ok(_CHECKMARK)
def _remove_sts_permissions(cloud_logger, sts_account_email, config,
bucket_name):
"""Remove the STS permissions from the new source bucket in the target project
Args:
cloud_logger: A GCP logging client instance
sts_account_email: The email account of the STS account
config: A Configuration object with all of the config values needed for the script to run
bucket_name: The name of the bucket to remove the permissions from
"""
spinner_text = "Removing STS permissions from bucket {}".format(bucket_name)
cloud_logger.log_text(spinner_text)
with yaspin(text=spinner_text) as spinner:
_remove_sts_iam_roles(sts_account_email, config.target_storage_client,
bucket_name)
spinner.ok(_CHECKMARK)
def _get_project_number(project_id, credentials):
"""Using the project id, get the unique project number for a project.
Args:
project_id: The id of the project
credentials: The credentials to use for accessing the project
Returns:
The project number as a string
"""
crm = discovery.build("cloudresourcemanager", "v1", credentials=credentials)
project = (crm.projects().get(projectId=project_id).execute(num_retries=5)) # pylint: disable=no-member
return project["projectNumber"]
def _create_bucket(spinner, cloud_logger, config, bucket_name,
source_bucket_details):
"""Creates a bucket and replicates all of the settings from source_bucket_details.
Args:
spinner: The spinner displayed in the console
cloud_logger: A GCP logging client instance
config: A Configuration object with all of the config values needed for the script to run
bucket_name: The name of the bucket to create
source_bucket_details: The details copied from the source bucket that is being moved
Returns:
The bucket object that has been created in GCS
"""
bucket = storage.Bucket(client=config.target_storage_client,
name=bucket_name)
bucket.location = source_bucket_details.location
bucket.storage_class = source_bucket_details.storage_class
bucket.requester_pays = source_bucket_details.requester_pays
bucket.cors = source_bucket_details.cors
bucket.labels = source_bucket_details.labels
bucket.lifecycle_rules = source_bucket_details.lifecycle_rules
bucket.versioning_enabled = source_bucket_details.versioning_enabled
if source_bucket_details.default_kms_key_name:
bucket.default_kms_key_name = source_bucket_details.default_kms_key_name
# The target project GCS service account must be given
# Encrypter/Decrypter permission for the key
_add_target_project_to_kms_key(
spinner, cloud_logger, config,
source_bucket_details.default_kms_key_name)
if source_bucket_details.logging:
bucket.enable_logging(
source_bucket_details.logging["logBucket"],
source_bucket_details.logging["logObjectPrefix"],
)
_create_bucket_api_call(spinner, cloud_logger, bucket)
if source_bucket_details.iam_policy:
_update_iam_policies(config, bucket, source_bucket_details)
_write_spinner_and_log(
spinner,
cloud_logger,
"IAM policies successfully copied over from the source bucket",
)
if source_bucket_details.acl_entities:
new_acl = _update_acl_entities(config,
source_bucket_details.acl_entities)
bucket.acl.save(acl=new_acl)
_write_spinner_and_log(
spinner,
cloud_logger,
"ACLs successfully copied over from the source bucket",
)
else:
_print_and_log(cloud_logger,
"setting target bucket to uniform level access")
bucket.iam_configuration.uniform_bucket_level_access_enabled = True
bucket.patch()
if source_bucket_details.default_obj_acl_entities:
new_default_obj_acl = _update_acl_entities(
config, source_bucket_details.default_obj_acl_entities)
bucket.default_object_acl.save(acl=new_default_obj_acl)
_write_spinner_and_log(
spinner,
cloud_logger,
"Default Object ACLs successfully copied over from the source bucket",
)
if source_bucket_details.notifications:
_update_notifications(spinner, cloud_logger, config,
source_bucket_details.notifications, bucket)
_write_spinner_and_log(
spinner,
cloud_logger,
"{} Created {} new notifications for the bucket {}".format(
_CHECKMARK, len(source_bucket_details.notifications),
bucket_name),
)
return bucket
def _retry_if_false(result):
"""Return True if we should retry because the function returned False"""
return result is False
@retry(
retry_on_result=_retry_if_false,
wait_exponential_multiplier=4000,
wait_exponential_max=60000,
stop_max_attempt_number=5,
)
def _create_bucket_api_call(spinner, cloud_logger, bucket):
"""Calls the GCS api method to create the bucket.
The method will attempt to retry up to 5 times if the 503 ServiceUnavailable
exception is raised.
Args:
spinner: The spinner displayed in the console
cloud_logger: A GCP logging client instance
bucket: The bucket object to create
Returns:
True if the bucket was created, False if a ServiceUnavailable exception was raised
Raises:
google.cloud.exceptions.Conflict: The underlying Google Cloud api will raise this error if
the bucket already exists.
"""
try:
bucket.create()
except exceptions.ServiceUnavailable:
_write_spinner_and_log(
spinner,
cloud_logger,
"503 Service Unavailable error returned."
" Retrying up to 5 times with exponential backoff.",
)
return False
return True
def _update_iam_policies(config, bucket, source_bucket_details):
"""Take the existing IAM, replace the source project number with the target project
number and then assign the IAM to the new bucket.
Args:
config: A Configuration object with all of the config values needed for the script to run
bucket: The bucket object to update the IAM policies for
source_bucket_details: The details copied from the source bucket that is being moved
"""
policy = bucket.get_iam_policy()
# Update the original policy with the etag for the policy we just got so the update is
# associated with our get request to make sure no other update overwrites our change
source_bucket_details.iam_policy.etag = policy.etag
for role in source_bucket_details.iam_policy:
for member in source_bucket_details.iam_policy[role]:
# If a project level role was set, replace it with an identical one for the new project
if ":" + config.source_project in member:
new_member = member.replace(config.source_project,
config.target_project)
source_bucket_details.iam_policy[role].discard(member)
source_bucket_details.iam_policy[role].add(new_member)
# Give the target bucket all of the same policies as the source bucket, but with updated
# project roles
bucket.set_iam_policy(source_bucket_details.iam_policy)
def _update_acl_entities(config, source_entities):
"""Update the source ACL entities.
Take the existing ACLs, replace the source project number with the target project number and
then assign the ACLs to the new bucket.
Args:
config: A Configuration object with all of the config values needed for the script to run
source_entities: The existing ACL entities of the bucket
Returns:
The list of ACLs with project numbers replaced
"""
source_project_number = _get_project_number(
config.source_project, config.source_project_credentials)
target_project_number = _get_project_number(
config.target_project, config.target_project_credentials)
new_acl = storage.acl.ACL()
new_acl.loaded = True
# If an entity is for the source project, replace it with the identical one for the new
# project
for entity in source_entities:
# Skip it if it has no identifier
if not hasattr(entity, "identifier"):
continue
# Skip it if the identifier is empty
if entity.identifier is None:
continue
# Skip it if the identifier doesn't contain the source project number
if "-" + source_project_number not in entity.identifier:
continue
# Replace the source project number with the target project number and add the entity
entity.identifier = entity.identifier.replace(source_project_number,
target_project_number)
new_acl.add_entity(entity)
return new_acl
def _update_notifications(spinner, cloud_logger, config, notifications, bucket):
"""Update the notifications on the target bucket to match those from the source bucket.
Args:
spinner: The spinner displayed in the console
cloud_logger: A GCP logging client instance
config: A Configuration object with all of the config values needed for the script to run
notifications: A list of notifications to add to the bucket
bucket: The bucket object to update the notifications for
"""
for item in notifications:
# Give target project service account access to publish to source project topic
_assign_target_project_to_topic(spinner, cloud_logger, config,
item.topic_name, item.topic_project)
notification = storage.notification.BucketNotification(
bucket,
item.topic_name,
topic_project=item.topic_project,
custom_attributes=item.custom_attributes,
event_types=item.event_types,
blob_name_prefix=item.blob_name_prefix,
payload_format=item.payload_format,
)
notification.create()
def _get_sts_iam_account_email(sts_client, project_id):
"""Get the account email that the STS service will run under.
Args:
sts_client: The STS client object to be used
project_id: The id of the project
Returns:
The STS service account email as a string
"""
result = (sts_client.googleServiceAccounts().get(
projectId=project_id).execute(num_retries=5))
return result["accountEmail"]
def _assign_sts_iam_roles(sts_email, storage_client, project_name, bucket_name,
assign_viewer):
"""Assign roles to the STS service account that will be required to interact with the bucket.
Args:
sts_email: The email address for the STS service account
storage_client: The storage client object used to access GCS
project_name: The name of the project
bucket_name: The name of the bucket
assign_viewer: True if we should also assign the Object Viewer/LegacyReader roles
"""
account = "serviceAccount:" + sts_email
bucket = storage_client.bucket(bucket_name, project_name)
policy = bucket.get_iam_policy()
policy["roles/storage.legacyBucketWriter"].add(account)
if assign_viewer:
policy[iam.STORAGE_OBJECT_VIEWER_ROLE].add(account)
policy["roles/storage.legacyBucketReader"].add(account)
bucket.set_iam_policy(policy)
def _remove_sts_iam_roles(sts_email, storage_client, bucket_name):
"""Remove the roles that were assigned for the STS service account.
Args:
sts_email: The email address for the STS service account
storage_client: The storage client object used to access GCS
bucket_name: The name of the bucket
"""
account = "serviceAccount:" + sts_email
bucket = storage_client.bucket(bucket_name)
policy = bucket.get_iam_policy()
policy["roles/storage.legacyBucketWriter"].discard(account)
bucket.set_iam_policy(policy)
def _add_target_project_to_kms_key(spinner, cloud_logger, config, kms_key_name):
"""Gives the service_account_email the Encrypter/Decrypter role for the given KMS key.
Args:
spinner: The spinner displayed in the console
cloud_logger: A GCP logging client instance
config: A Configuration object with all of the config values needed for the script to run
kms_key_name: The name of the KMS key that the project should be given access to
"""
kms_client = discovery.build("cloudkms",
"v1",
credentials=config.source_project_credentials)
# Get the current IAM policy and add the new member to it.
crypto_keys = (kms_client.projects().locations().keyRings().cryptoKeys()) # pylint: disable=no-member
policy_request = crypto_keys.getIamPolicy(resource=kms_key_name)
policy_response = policy_request.execute(num_retries=5)
bindings = []
if "bindings" in list(policy_response.keys()):
bindings = policy_response["bindings"]
service_account_email = config.target_storage_client.get_service_account_email(
)
members = ["serviceAccount:" + service_account_email]
bindings.append({
"role": "roles/cloudkms.cryptoKeyEncrypterDecrypter",
"members": members,
})
policy_response["bindings"] = bindings
# Set the new IAM Policy.
request = crypto_keys.setIamPolicy(resource=kms_key_name,
body={"policy": policy_response})
request.execute(num_retries=5)
_write_spinner_and_log(
spinner,
cloud_logger,
"{} {} added as Enrypter/Decrypter to key: {}".format(
_CHECKMARK, service_account_email, kms_key_name),
)
def _assign_target_project_to_topic(spinner, cloud_logger, config, topic_name,
topic_project):
"""Gives the service_account_email the Publisher role for the topic.
Args:
spinner: The spinner displayed in the console
cloud_logger: A GCP logging client instance
config: A Configuration object with all of the config values needed for the script to run
topic_name: The name of the topic that the target project should be assigned to
topic_project: The name of the project that the topic belongs to
"""
client = pubsub.PublisherClient(
credentials=config.source_project_credentials)
topic_path = client.topic_path(topic_project, topic_name) # pylint: disable=no-member
policy = client.get_iam_policy(topic_path) # pylint: disable=no-member
service_account_email = config.target_storage_client.get_service_account_email(
)
policy.bindings.add(
role="roles/pubsub.publisher",
members=["serviceAccount:" + service_account_email],
)
client.set_iam_policy(topic_path, policy) # pylint: disable=no-member
_write_spinner_and_log(
spinner,
cloud_logger,
"{} {} added as a Publisher to topic: {}".format(
_CHECKMARK, service_account_email, topic_name),
)
@retry(
retry_on_result=_retry_if_false,
wait_exponential_multiplier=10000,
wait_exponential_max=120000,
stop_max_attempt_number=10,
)
def _run_and_wait_for_sts_job(
sts_client,
target_project,
source_bucket_name,
sink_bucket_name,
cloud_logger,
config,
transfer_log_value,
):
"""Kick off the STS job and wait for it to complete. Retry if it fails.
Args:
sts_client: The STS client object to be used
target_project: The name of the target project where the STS job will be created
source_bucket_name: The name of the bucket where the STS job will transfer from
sink_bucket_name: The name of the bucket where the STS job will transfer to
cloud_logger: A GCP logging client instance
Returns:
True if the STS job completed successfully, False if it failed for any reason
"""
# Note that this routine is in a @retry decorator, so non-True exits
# and unhandled exceptions will trigger a retry.
msg = "Moving from bucket {} to {}".format(source_bucket_name,
sink_bucket_name)
_print_and_log(cloud_logger, msg)
spinner_text = "Creating STS job"
cloud_logger.log_text(spinner_text)
with yaspin(text=spinner_text) as spinner:
sts_job_name = _execute_sts_job(
sts_client,
target_project,
source_bucket_name,
sink_bucket_name,
config,
transfer_log_value,
)
spinner.ok(_CHECKMARK)
# Check every 10 seconds until STS job is complete
with yaspin(text="Checking STS job status") as spinner:
while True:
job_status = _check_sts_job(spinner, cloud_logger, sts_client,
target_project, sts_job_name)
if job_status != sts_job_status.StsJobStatus.IN_PROGRESS:
break
sleep(10)
if job_status == sts_job_status.StsJobStatus.SUCCESS:
return True
# Execution will only reach this code if something went wrong with the STS job
_print_and_log(
cloud_logger,
"There was an unexpected failure with the STS job. You can view the"
" details in the cloud console.",
)
_print_and_log(
cloud_logger,
"Waiting for a period of time and then trying again. If you choose to"
" cancel this script, the buckets will need to be manually cleaned up.",
)
return False
def _execute_sts_job(
sts_client,
target_project,
source_bucket_name,
sink_bucket_name,
config,
transfer_log_value,
):
"""Start the STS job.
Args:
sts_client: The STS client object to be used
target_project: The name of the target project where the STS job will be created
source_bucket_name: The name of the bucket where the STS job will transfer from
sink_bucket_name: The name of the bucket where the STS job will transfer to
Returns:
The name of the STS job as a string
"""
now = datetime.date.today()
if config.bucket_name == sink_bucket_name:
time_preserved = None
else:
if config.preserve_custom_time == None:
time_preserved = None
elif config.preserve_custom_time == "TIME_CREATED_PRESERVE_AS_CUSTOM_TIME":
time_preserved = config.preserve_custom_time
elif config.preserve_custom_time == "TIME_CREATED_SKIP":
time_preserved = config.preserve_custom_time
elif config.preserve_custom_time == "TIME_CREATED_UNSPECIFIED":
time_preserved = config.preserve_custom_time
else:
msg = "Time created value is not available"
raise SystemExit(msg)
transfer_job = {
"description":
"Move bucket {} to {} in project {}".format(source_bucket_name,
sink_bucket_name,
target_project),
"status":
"ENABLED",
"projectId":
target_project,
"schedule": {
"scheduleStartDate": {
"day": now.day - 1,
"month": now.month,
"year": now.year,
},
"scheduleEndDate": {
"day": now.day - 1,
"month": now.month,
"year": now.year,
},
},
"transferSpec": {
"gcsDataSource": {
"bucketName": source_bucket_name
},
"gcsDataSink": {
"bucketName": sink_bucket_name
},
"transferOptions": {
"deleteObjectsFromSourceAfterTransfer": True,
"metadataOptions": {
"timeCreated": time_preserved
},
},
},
}
transfer_job["loggingConfig"] = transfer_log_value
result = sts_client.transferJobs().create(body=transfer_job).execute(
num_retries=5)
return result["name"]
def _check_sts_job(spinner, cloud_logger, sts_client, target_project, job_name):
"""Check on the status of the STS job.
Args:
spinner: The spinner displayed in the console
cloud_logger: A GCP logging client instance
sts_client: The STS client object to be used
target_project: The name of the target project where the STS job will be created
job_name: The name of the STS job that was created
Returns:
The status of the job as an StsJobStatus enum
"""
filter_string = (
'{{"project_id": "{project_id}", "job_names": ["{job_name}"]}}').format(
project_id=target_project, job_name=job_name)
result = (sts_client.transferOperations().list(
name="transferOperations", filter=filter_string).execute(num_retries=5))
if result:
operation = result["operations"][0]
metadata = operation["metadata"]
if operation.get("done"):
if metadata["status"] != "SUCCESS":
spinner.fail("X")
return sts_job_status.StsJobStatus.FAILED
_print_sts_counters(spinner, cloud_logger, metadata["counters"],
True)
spinner.ok(_CHECKMARK)
return sts_job_status.StsJobStatus.SUCCESS
else:
# Update the status of the copy
if "counters" in metadata:
_print_sts_counters(spinner, cloud_logger, metadata["counters"],
False)
return sts_job_status.StsJobStatus.IN_PROGRESS
def _print_sts_counters(spinner, cloud_logger, counters, is_job_done):
"""Print out the current STS job counters.
Args:
spinner: The spinner displayed in the console
cloud_logger: A GCP logging client instance
counters: The counters object returned as part of the STS job status query
is_job_done: If True, print out the final counters instead of just the in progress ones
"""
if counters:
bytes_copied_to_sink = int(counters.get("bytesCopiedToSink", "0"))
objects_copied_to_sink = int(counters.get("objectsCopiedToSink", "0"))
bytes_found_from_source = int(counters.get("bytesFoundFromSource", "0"))
objects_found_from_source = int(
counters.get("objectsFoundFromSource", "0"))
bytes_deleted_from_source = int(
counters.get("bytesDeletedFromSource", "0"))
objects_deleted_from_source = int(
counters.get("objectsDeletedFromSource", "0"))
if is_job_done:
byte_status = (bytes_copied_to_sink == bytes_found_from_source ==
bytes_deleted_from_source)
object_status = (objects_copied_to_sink == objects_found_from_source
== objects_deleted_from_source)
if byte_status and object_status:
new_text = "Success! STS job copied {} bytes in {} objects".format(
bytes_copied_to_sink, objects_copied_to_sink)
else:
new_text = (
"Error! STS job copied {} of {} bytes in {} of {} objects and deleted"
" {} bytes and {} objects").format(
bytes_copied_to_sink,
bytes_found_from_source,
objects_copied_to_sink,
objects_found_from_source,
bytes_deleted_from_source,
objects_deleted_from_source,
)
if spinner.text != new_text:
spinner.write(spinner.text)
spinner.text = new_text
cloud_logger.log_text(new_text)
else:
if bytes_copied_to_sink > 0 and objects_copied_to_sink > 0:
byte_percent = "{:.0%}".format(
float(bytes_copied_to_sink) /
float(bytes_found_from_source))
object_percent = "{:.0%}".format(
float(objects_copied_to_sink) /
float(objects_found_from_source))
spinner.write(spinner.text)
new_text = "{} of {} bytes ({}) copied in {} of {} objects ({})".format(
bytes_copied_to_sink,
bytes_found_from_source,
byte_percent,
objects_copied_to_sink,
objects_found_from_source,
object_percent,
)
spinner.text = new_text
cloud_logger.log_text(new_text)
def _print_and_log(cloud_logger, message):
"""Print the message and log it to the cloud.
Args:
cloud_logger: A GCP logging client instance
message: The message to log
"""
cloud_logger.log_text(message)
def _write_spinner_and_log(spinner, cloud_logger, message):
"""Write the message to the spinner and log it to the cloud.
Args:
spinner: The spinner object to write the message to
cloud_logger: A GCP logging client instance
message: The message to print and log
"""
spinner.write(message)
cloud_logger.log_text(message)
if __name__ == "__main__":
main(None, None, None)