scripts/provisioning/command_line.py (176 lines of code) (raw):

# Copyright 2019, Google, Inc. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Command-line sample that creates STS Job Pool and syncs with SDRS. Note the start times are in UTC (GMT-7 for PDT months) Example command line arguments from the prompt: 'python command_line.py create sdrs-server 2019/05/15 ds-dev-rpo ds-bucket-dev http://localhost:8080' For more information, see the README.md. """ import argparse import datetime import json import logging import requests import sys import utils import googleapiclient.discovery from google.cloud import storage logging.basicConfig() logging.getLogger('googleapicliet.discovery_cache').setLevel(logging.ERROR) LOGGER = logging.getLogger('sdrs_provisioning_cli') # [START main] def main(command, project_id, start_date, source_bucket, sink_bucket, service_name): storage_client = storage.Client() #service_name = 'http://david-sdrs-api.endpoints.sdrs-server.cloud.goog' try: bucket = storage_client.get_bucket(source_bucket) print("Bucket exists, proceeding") except Exception as e: LOGGER.error("Exception, exiting program " + str(e)) sys.exit() if command == 'create': pooled_sts_jobs = _create_sts_jobs_for_bucket(project_id, start_date, source_bucket, sink_bucket, 'dataset') _register_sdrs_sts_jobs(source_bucket, project_id, pooled_sts_jobs, service_name) elif command == 'delete': _delete_sts_jobs_for_bucket(project_id, source_bucket, service_name) else: print("Unknown command " + str(command)) sys.exit() # [END main] # [START _create_sts_jobs_for_bucket] def _create_sts_jobs_for_bucket(project_id, start_date, source_bucket, sink_bucket, job_type): storagetransfer = googleapiclient.discovery.build('storagetransfer', 'v1', cache_discovery=False) sts_jobs = [] number_of_jobs = 25 i = 0 while i < number_of_jobs: description = 'Pooled STS Job ' + str(i) + ' for bucket ' + source_bucket if i == 24: # create the default job - number 25 job_type = 'default' start_time_string = '{:02d}:59:59'.format(23) else: start_time_string = '{:02d}:00:00'.format(i) start_time = datetime.datetime.strptime(start_time_string, '%H:%M:%S') #Transfer time is in UTC Time (24hr) HH:MM:SS. transfer_job = { 'description': description, 'status': 'DISABLED', 'projectId': project_id, 'schedule': { 'scheduleStartDate': { 'day': start_date.day, 'month': start_date.month, 'year': start_date.year }, 'startTimeOfDay': { 'hours': start_time.hour, 'minutes': start_time.minute, 'seconds': start_time.second } }, 'transferSpec': { 'gcsDataSource': { 'bucketName': source_bucket }, 'gcsDataSink': { 'bucketName': sink_bucket } } } try: result = storagetransfer.transferJobs().create(body=transfer_job).execute() pooled_sts_job = { 'name': result.get("name"), 'status': result.get("status"), 'type': job_type, 'projectId': result.get("projectId"), 'sourceBucket': result.get("transferSpec").get("gcsDataSource").get("bucketName"), 'sourceProject': project_id, 'targetBucket': result.get("transferSpec").get("gcsDataSink").get("bucketName"), 'schedule': start_time_string } sts_jobs.append(pooled_sts_job) i += 1 except Exception as e: # If an exception is encountered during any API iteration, roll back the transaction and error out LOGGER.error("Exception, rolling back and exiting program " + str(e)) _exit_creation_with_cleanup(sts_jobs) print("Successfully created STS job pool in the cloud, standby") return sts_jobs # [END _create_sts_jobs_for_bucket] # [START _exit_creation_with_cleanup] def _exit_creation_with_cleanup(sts_jobs): storagetransfer = googleapiclient.discovery.build('storagetransfer', 'v1', cache_discovery=False) for sts_job in sts_jobs: job_name = sts_job.get("name") update_transfer_job_request_body = { 'project_id': sts_job.get("projectId"), 'update_transfer_job_field_mask': 'status', 'transfer_job': { 'status': 'DELETED' } } request = storagetransfer.transferJobs().patch(jobName=job_name, body=update_transfer_job_request_body) response = request.execute() sys.exit() # [END _exit_creation_with_cleanup] # [START _delete_sts_jobs_for_bucket] def _delete_sts_jobs_for_bucket(project_id, source_bucket, service_name): storagetransfer = googleapiclient.discovery.build('storagetransfer', 'v1', cache_discovery=False) # For the bucket, get the list of sts jobs to delete from SDRS sts_jobs = _get_pooled_sts_jobs(project_id, source_bucket, service_name) # Use the name of the jobs to delete them from the cloud for sts_job in sts_jobs: job_name = sts_job.get("name") update_transfer_job_request_body = { 'project_id': project_id, 'update_transfer_job_field_mask': 'status', 'transfer_job': { 'status': 'DELETED' } } request = storagetransfer.transferJobs().patch(jobName=job_name, body=update_transfer_job_request_body) response = request.execute() #Finally, delete the STS job records from SDRS print("Deleted STS Jobs from the cloud, now deleting from SDRS metadata, standby") _unregister_sdrs_sts_jobs(project_id, source_bucket, service_name) # [END _delete_sts_jobs_for_bucket] # [START _get_pooled_sts_jobs] def _get_pooled_sts_jobs(project_id, source_bucket, service_name): """Makes a request to get the pooled STS jobs from SDRS.""" url = '{}/stsjobpool?sourceBucket={}&sourceProject={}'.format( service_name, source_bucket, project_id) LOGGER.debug('GET: %s', url) response = requests.get(url, headers=utils.get_auth_header(service_name)) LOGGER.debug('Response: %s', response.text) if response.status_code == requests.codes.ok: return response.json() else: LOGGER.error('Unexpected response code %s returned: %s', response.status_code, response.text) # [END _get_pooled_sts_jobs] # [START _register_sdrs_sts_jobs] def _register_sdrs_sts_jobs(source_bucket, project_id, pooled_sts_jobs, service_name): """Makes a request to register the STS job with SDRS so it can be utilized.""" url = '{}/stsjobpool?sourceBucket={}&sourceProject={}'.format( service_name, source_bucket, project_id) print("Registering STS job pool with SDRS, standby") LOGGER.debug('POST: %s', url) LOGGER.debug('Body: %s', pooled_sts_jobs) response = requests.post(url, json=pooled_sts_jobs, headers=utils.get_auth_header(service_name)) LOGGER.debug('Response: %s', response.text) if response.status_code == requests.codes.ok: print('Successful provisioning of jobs with SDRS: {}'.format( response.text)) else: LOGGER.error('Unexpected response code %s returned: %s', response.status_code, response.text) LOGGER.error("Rolling back and exiting program") _exit_creation_with_cleanup(pooled_sts_jobs) # [END _register_sdrs_sts_jobs] # [START _unregister_sdrs_sts_jobs] def _unregister_sdrs_sts_jobs(project_id, source_bucket, service_name): """Makes a request to unregister the STS job pool from SDRS.""" url = '{}/stsjobpool?sourceBucket={}&sourceProject={}'.format( service_name, source_bucket, project_id) LOGGER.debug('DELETE: %s', url) response = requests.delete(url, headers=utils.get_auth_header(service_name)) LOGGER.debug('Response: %s', response.text) if response.status_code == requests.codes.ok: print('Successful unregistering of STS jobs with SDRS: {}'.format( response.text)) else: LOGGER.error('Unexpected response code %s returned: %s', response.status_code, response.text) # [END _unregister_sdrs_sts_jobs] if __name__ == '__main__': parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('command', help='create or delete.') parser.add_argument('project_id', help='Your Google Cloud project ID.') parser.add_argument('start_date', help='Date YYYY/MM/DD.') parser.add_argument('source_bucket', help='Source GCS bucket name.') parser.add_argument('sink_bucket', help='Target GCS bucket name.') parser.add_argument('service_name', help='The SDRS service name.') args = parser.parse_args() start_date = datetime.datetime.strptime(args.start_date, '%Y/%m/%d') main( args.command, args.project_id, start_date, args.source_bucket, args.sink_bucket, args.service_name) # [END all]