in clouddq-migration/dataplex.py [0:0]
def convert_config_to_payload(config):
'''
Method to convert a config into payload
'''
# Initialize request argument(s)
data_scan = dataplex_v1.DataScan()
data_scan.data.resource = config.get('resource')
data_scan.data.entity = config.get('entity')
if 'description' in config:
data_scan.description = config['description']
if 'displayName' in config:
data_scan.display_name = config['displayName']
if 'labels' in config:
data_scan.labels = config['labels']
if 'samplingPercent' in config['dataQualitySpec']:
data_scan.data_quality_spec.sampling_percent = config['dataQualitySpec']['samplingPercent']
else:
data_scan.data_quality_spec.sampling_percent = 10
data_scan.data_quality_spec.rules = config['dataQualitySpec']['rules']
if 'rowFilter' in config['dataQualitySpec']:
data_scan.data_quality_spec.row_filter = config['dataQualitySpec']['rowFilter']
if 'postScanActions' in config['dataQualitySpec']:
data_scan.data_quality_spec.post_scan_actions.bigquery_export.results_table = config['dataQualitySpec']['postScanActions']['bigqueryExport']['resultsTable']
if 'executionSpec' in config and 'trigger' in config['executionSpec'] and 'schedule' in config['executionSpec']['trigger']:
data_scan.execution_spec.trigger.schedule.cron = config['executionSpec']['trigger']['schedule']['cron']
else:
data_scan.execution_spec.trigger.on_demand = {}
if 'executionSpec' in config and 'incrementalField' in config['executionSpec']:
data_scan.execution_spec.field = config['executionSpec']['incrementalField']
return data_scan