clouddq-migration/main.py (183 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import click from pathlib import Path from lib import validate_task from lib import generate_config from lib import generate_id from lib import validateConfigFile from lib import merge_configs from dataplex import get_yaml_data from dataplex import convert_config_to_payload from dataplex import create_datascan from dataplex import list_lakes from dataplex import list_tasks class ListParamType(click.ParamType): name = 'list' def convert(self, value, param, ctx): try: task_ids = value.split(',') return task_ids except Exception as e: self.fail('Could not parse list. Expected format: project_id.location_id.lake_id.task_id,project_id.location_id.lake_id.task_id,...') @click.command() @click.option( "--gcp_project_id", help="GCP Project ID where AutoDQ tasks will be created. ", default=None, type=str, ) @click.option( "--region_id", help="GCP region Id where the AutoDQ tasks will be created. ", default="us-central1", type=str, ) @click.option( "--source_project", help="GCP Project ID where CloudDQ tasks exists. ", default=None, type=str, ) @click.option( "--source_region", help="GCP region Id where CloudDQ tasks exists. ", default="None", type=str, ) @click.option( "--task_ids", help="A list of existing CloudDQ Task Ids. " "Expected format: project_id.location_id.lake_id.task_id,project_id.location_id.lake_id.task_id,...", default=None, type=ListParamType(), ) @click.option( "--config_path", help="Users can choose to update the configuration via a YAML file by specifying the file path. ", type=click.Path(exists=True), default=None, ) def main( gcp_project_id: str, region_id: str, source_project: str, source_region:str, task_ids: list, config_path: Path, ) -> None: if not gcp_project_id or not region_id: raise ValueError( "CLI input must define the required configs using the parameter: " "'--gcp_project_id', '--region_id')." ) if task_ids: for task in task_ids: print('Validating the task(s)') validate_task(task) task_fields = task.split('.') source_project = task_fields[0] source_region = task_fields[1] lake_id = task_fields[2] task_id = task_fields[3] # get data quality yaml spec yaml_data, trigger_spec = get_yaml_data(source_project,source_region, lake_id, task_id) # generate config config = generate_config(yaml_data) if trigger_spec.type_ == 2: cron = trigger_spec.schedule config.update( { 'executionSpec': { 'trigger': { 'schedule': { 'cron': cron } } } } ) # generate payload payload = convert_config_to_payload(config) # generate AuotoDQ Id datascan_id = generate_id() print(f'Generated task Id for clouddq task {task_id}: {datascan_id}') # create datascan response = create_datascan( gcp_project_id, region_id, datascan_id, payload ) if response is not None: print(f"{datascan_id} has been created successfully.") elif config_path: # validate config file print(f'Checking the configuration file located at {config_path}') config_file = validateConfigFile(config_path) for new_config in config_file: taskId = new_config['taskId'] task_fields = taskId.split('.') source_project = task_fields[0] source_region = task_fields[1] lake_id = task_fields[2] task_id = task_fields[3] # get data quality yaml spec yaml_data, trigger_spec = get_yaml_data(source_project,source_region, lake_id, task_id) # generate config config = generate_config(yaml_data) # final config final_config = merge_configs(config, new_config) if trigger_spec.type_ == 2: cron = trigger_spec.schedule final_config.update( { 'executionSpec': { 'trigger': { 'schedule': { 'cron': cron } } } } ) # generate payload payload = convert_config_to_payload(final_config) # generate AuotoDQ Id datascan_id = generate_id() print(f'Generated task Id for clouddq task {task_id}: {datascan_id}') # create datascan response = create_datascan( gcp_project_id, region_id, datascan_id, payload ) if response is not None: print(f"{datascan_id} has been created successfully.") else: if not source_project or not source_region: raise ValueError( "CLI input must define the required configs using the parameter: " "'--source_project', '--source_region')." ) # get lakes lakes = list_lakes(source_project, source_region) if lakes: for lake in lakes: # get tasks tasks = list_tasks(source_project, source_region, lake) if tasks: for task_id, task_details in tasks.items(): # get data quality yaml spec yaml_data, trigger_spec = get_yaml_data(source_project,source_region, lake, task_id) # generate config config = generate_config(yaml_data) if trigger_spec.type_ == 2: cron = trigger_spec.schedule config.update( { 'executionSpec': { 'trigger': { 'schedule': { 'cron': cron }}}}) payload = convert_config_to_payload(config) # generate AuotoDQ Id datascan_id = generate_id() print(f'Generated task Id for clouddq task {task_id}: {datascan_id}') # create datascan response = create_datascan( gcp_project_id, region_id, datascan_id, payload ) if response is not None: print(f"{datascan_id} has been created successfully.") else: print(f'No CloudDQ tasks exists in lake: {lake}') else: print(f'No CloudDQ tasks exists in project: {source_project}') if __name__ == "__main__": main()