def generate_cdc_dag_files()

in src/generate_query.py [0:0]


def generate_cdc_dag_files(raw_table_name, cdc_table_name, load_frequency,
                           gen_test, allow_telemetry, bq_location):
    """Generates file containing DAG code to refresh CDC table from RAW table.

    Args:
        raw_table_name: Full table name of raw table (dataset.table_name).
        cdc_table_name: Full table name of cdc table (dataset.table_name).
        load_frequency: DAG run frequency.
        gen_test: If test data is needed.
        allow_telemetry: If telemetry allowed.
        bq_location: BigQuery location for DAG execution.


    Raises:
        cortex_exc.KeyCEError: If primary keys not found in table DD03L.
        Exception: If error in executing DAG to populate CDC tables.

    """

    dag_file_name_part = 'cdc_' + raw_table_name.replace('.', '_')
    dag_py_file_name = dag_file_name_part + '.py'
    dag_sql_file_name = dag_file_name_part + '.sql'

    today = datetime.datetime.now()
    substitutes = {
        'base_table': raw_table_name,
        'year': today.year,
        'month': today.month,
        'day': today.day,
        'query_file': dag_sql_file_name,
        'load_frequency': load_frequency,
        'runtime_labels_dict': '', # A place holder for label key
        'bq_location': bq_location
    }
     # Add bq_labels to substitutes dict if Telemetry allowed
     # Converts dict to string for substitution purposes
    if allow_telemetry:
        substitutes['runtime_labels_dict'] = str(constants.CORTEX_JOB_LABEL)

    # Create python DAG file.
    generate_dag_py_file(_SQL_DAG_PYTHON_TEMPLATE, dag_py_file_name,
                         **substitutes)

    # Create query for SQL script that will be used in the DAG.
    fields = []
    update_fields = []

    raw_table_schema = client.get_table(raw_table_name).schema
    for field in raw_table_schema:
        if field.name not in _CDC_EXCLUDED_COLUMN_LIST:
            fields.append(f'`{field.name}`')
            update_fields.append((f'T.`{field.name}` = S.`{field.name}`'))

    if not fields:
        print(f'Schema could not be retrieved for {raw_table_name}')

    primary_keys = get_primary_keys(raw_table_name)
    if not primary_keys:
        e_msg = f'Primary keys for {raw_table_name} not found in table DD03L'
        raise cortex_exc.KeyCError(e_msg) from None

    primary_key_join_list = []
    primary_key_not_null_list = []
    for key in primary_keys:
        primary_key_join_list.append(f'T.`{key}` = S.`{key}`')
        primary_key_not_null_list.append(f'`{key}` IS NOT NULL')

    primary_keys_join_clause = ' AND '.join(primary_key_join_list)
    primary_keys_not_null_clause = ' AND '.join(primary_key_not_null_list)

    with open(_SQL_DAG_SQL_TEMPLATE, mode='r',
              encoding='utf-8') as sql_template_file:
        sql_template = Template(sql_template_file.read())

    generated_sql = sql_template.substitute(
        base_table=raw_table_name,
        target_table=cdc_table_name,
        primary_keys_join_clause=primary_keys_join_clause,
        primary_keys_not_null_clause=primary_keys_not_null_clause,
        fields=', '.join(fields),
        update_fields=', '.join(update_fields),
        primary_keys=', '.join(primary_keys))

    # Create sql file containing the query
    cdc_sql_file = _GENERATED_SQL_DIR + '/' + dag_sql_file_name
    with open(cdc_sql_file, mode='w+', encoding='utf-8') as generated_sql_file:
        generated_sql_file.write(generated_sql)
        generated_sql_file.close()
        print(f'Created DAG sql file {cdc_sql_file}')

    # If test data is needed, we want to populate CDC tables as well
    # from data in the RAW tables.
    # Good thing is - we already have the sql query available to do that.
    if gen_test.upper() == 'TRUE':
        query_job = client.query(generated_sql)
        # Let's wait for query to complete.
        try:
            _ = query_job.result()
        except Exception as e:  #pylint:disable=broad-except
            print('Error in running DAG sql')
            print(f'DAG sql: {generated_sql}')
            print(f'Error: {str(e)}')
            raise e
        print(f'{cdc_table_name} table is populated with data '
              f'from {raw_table_name} table')