def convert_config_to_payload()

in clouddq-migration/dataplex.py [0:0]


def convert_config_to_payload(config):
    '''
        Method to convert a config into payload
    '''
    # Initialize request argument(s)
    data_scan = dataplex_v1.DataScan()

    data_scan.data.resource = config.get('resource')
    data_scan.data.entity = config.get('entity')
    if 'description' in config:
        data_scan.description = config['description']
    if 'displayName' in config:
        data_scan.display_name = config['displayName']
    if 'labels' in config:
        data_scan.labels = config['labels']
    if 'samplingPercent' in config['dataQualitySpec']:
        data_scan.data_quality_spec.sampling_percent = config['dataQualitySpec']['samplingPercent']
    else:
        data_scan.data_quality_spec.sampling_percent = 10
    data_scan.data_quality_spec.rules = config['dataQualitySpec']['rules']
    if 'rowFilter' in config['dataQualitySpec']:
        data_scan.data_quality_spec.row_filter = config['dataQualitySpec']['rowFilter']
    if 'postScanActions' in config['dataQualitySpec']:
        data_scan.data_quality_spec.post_scan_actions.bigquery_export.results_table = config['dataQualitySpec']['postScanActions']['bigqueryExport']['resultsTable']
    if 'executionSpec' in config and 'trigger' in config['executionSpec'] and 'schedule' in config['executionSpec']['trigger']:
        data_scan.execution_spec.trigger.schedule.cron = config['executionSpec']['trigger']['schedule']['cron']
    else:
       data_scan.execution_spec.trigger.on_demand = {}
    if 'executionSpec' in config and 'incrementalField' in config['executionSpec']:
        data_scan.execution_spec.field = config['executionSpec']['incrementalField']
    return data_scan