datascan/bulk-creation-scripts/main.py (99 lines of code) (raw):

import click from pathlib import Path from datascan import convertConfigToPayload from datascan import getDatascan from datascan import createDatascan from lib import validateConfigFile from lib import validateCLI from lib import generateDataScanId class ListParamType(click.ParamType): name = 'list' def convert(self, value, param, ctx): try: bq_tables = value.split(',') return bq_tables except Exception as e: self.fail('Could not parse list. Expected format: project_name.dataset_name.table_name,project_name.dataset_name.table_name,...') @click.command() @click.option( "--gcp_project_id", help="GCP Project ID where the data profile scans will be created. " "If --config_path is provided, this option will be ignored. ", default=None, type=str, ) @click.option( "--location_id", help="GCP region where the data profile scans will be created. " "If --config_path is provided, this option will be ignored. ", default=None, type=str, ) @click.option( "--bq_tables", help="A list of BQ tables. Each BQ table name should be unique; otherwise, an error will occur while creating the datascan with the same BQ table name." "Format: project_name.dataset_name.table_name,project_name.dataset_name.table_name,... " "If --config_path is provided, this option will be ignored. ", type=ListParamType(), ) @click.option( "--config_path", help="Users can choose to provide the configuration via a YAML file by specifying the file path. ", type=click.Path(exists=True), default=None, ) def main( gcp_project_id: str, location_id: str, bq_tables: list, config_path: Path, ) -> None: if config_path: # validate the config file print(f'Checking the configuration file located at {config_path}') configs = validateConfigFile(config_path) for config in configs: gcp_project_id = config['projectId'] location_id = config['locationId'] full_table_name = config['bqTable'] project_id = full_table_name.split('.')[0] dataset_id = full_table_name.split('.')[1] table_id = full_table_name.split('.')[2] # generate datascan id datascan_id = generateDataScanId(table_id) # check if datascan id already exists datascan = getDatascan(gcp_project_id, location_id, datascan_id) if datascan: print(f'{datascan_id} resource already exists. ') else: # generate payload payload = convertConfigToPayload(config, project_id, dataset_id, table_id) # create datascan response = createDatascan( gcp_project_id, location_id, datascan_id, payload ) if response is not None: print(f"{datascan_id} has been created successfully.") print(f"LRO ID for {datascan_id}: ",response.name.split('/')[-1]) else: #validate CLI arguments validateCLI(gcp_project_id, location_id, bq_tables) for full_table_name in bq_tables: project_id = full_table_name.split('.')[0] dataset_id = full_table_name.split('.')[1] table_id = full_table_name.split('.')[2] # generate datascan id datascan_id = generateDataScanId(table_id) # check if datascan id already exists datascan = getDatascan(gcp_project_id, location_id, datascan_id) if datascan: print(f'{datascan_id} resource already exists. ') else: # generate payload config = [] payload = convertConfigToPayload(config, project_id, dataset_id, table_id) # create datascan response = createDatascan( gcp_project_id, location_id, datascan_id, payload ) if response is not None: print(f"{datascan_id} has been created successfully.") print(f"LRO ID for {datascan_id}: ",response.name.split('/')[-1]) if __name__ == "__main__": main()