datascan/bulk-creation-scripts/datascan.py (61 lines of code) (raw):

from google.cloud import dataplex_v1 def createDatascan(gcp_project_id, location_id, datascan_id, datascan): """ Method to create a datascan """ try: # Create a client client = dataplex_v1.DataScanServiceClient() # Initialize request argument request = dataplex_v1.CreateDataScanRequest( parent=f"projects/{gcp_project_id}/locations/{location_id}", data_scan=datascan, data_scan_id=datascan_id, ) print(f'Creating Datascan: {datascan_id}') # Make the request operation = client.create_data_scan(request=request) response = operation.result() return response except Exception as error: print(f'Error: Failed to create {datascan_id}. ') print(error) return None def getDatascan(gcp_project_id, location_id, datascan_id): """ Method to get the list of datascans """ try: # Create a client client = dataplex_v1.DataScanServiceClient() # Initialize request argument request = dataplex_v1.GetDataScanRequest( name=f"projects/{gcp_project_id}/locations/{location_id}/dataScans/{datascan_id}", ) # Make the request operation = client.get_data_scan(request=request) return operation except Exception as error: return None def convertConfigToPayload(config, project_id, dataset_id, table_id): # Initialize request argument(s) data_scan = dataplex_v1.DataScan() data_scan.data.resource = f'//bigquery.googleapis.com/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}' if 'description' in config: data_scan.description = config['description'] if 'displayName' in config: data_scan.display_name = config['displayName'] if 'labels' in config: data_scan.labels = config['labels'] if 'dataProfileSpec' in config: if 'samplingPercent' in config['dataProfileSpec']: data_scan.data_profile_spec.sampling_percent = config['dataProfileSpec']['samplingPercent'] if 'rowFilter' in config['dataProfileSpec']: data_scan.data_profile_spec.row_filter = config['dataProfileSpec']['rowFilter'] if 'excludeFields' in config['dataProfileSpec'] and 'fieldNames' in config['dataProfileSpec']['excludeFields']: data_scan.data_profile_spec.exclude_fields.field_names = config['dataProfileSpec']['excludeFields'][ 'fieldNames'] if 'includeFields' in config['dataProfileSpec'] and 'fieldNames' in config['dataProfileSpec']['includeFields']: data_scan.data_profile_spec.include_fields.field_names = config['dataProfileSpec']['includeFields'][ 'fieldNames'] if 'postScanActions' in config['dataProfileSpec'] and 'bigqueryExport' in config['dataProfileSpec'][ 'postScanActions'] and 'resultsTable' in config['dataProfileSpec']['postScanActions']['bigqueryExport']: data_scan.data_profile_spec.post_scan_actions.bigquery_export.results_table = \ config['dataProfileSpec']['postScanActions']['bigqueryExport']['resultsTable'] else: data_scan.data_profile_spec.sampling_percent = 10 if 'executionSpec' in config and 'trigger' in config['executionSpec'] and 'schedule' in config['executionSpec'][ 'trigger'] and 'cron' in config['executionSpec']['trigger']['schedule']: data_scan.execution_spec.trigger.schedule.cron = config['executionSpec']['trigger']['schedule']['cron'] else: data_scan.execution_spec.trigger.on_demand = {} if 'executionSpec' in config and 'incrementalField' in config['executionSpec']: data_scan.execution_spec.field = config['executionSpec']['incrementalField'] return data_scan