in src/generate_query.py [0:0]
def generate_cdc_dag_files(raw_table_name, cdc_table_name, load_frequency,
gen_test, allow_telemetry, bq_location):
"""Generates file containing DAG code to refresh CDC table from RAW table.
Args:
raw_table_name: Full table name of raw table (dataset.table_name).
cdc_table_name: Full table name of cdc table (dataset.table_name).
load_frequency: DAG run frequency.
gen_test: If test data is needed.
allow_telemetry: If telemetry allowed.
bq_location: BigQuery location for DAG execution.
Raises:
cortex_exc.KeyCEError: If primary keys not found in table DD03L.
Exception: If error in executing DAG to populate CDC tables.
"""
dag_file_name_part = 'cdc_' + raw_table_name.replace('.', '_')
dag_py_file_name = dag_file_name_part + '.py'
dag_sql_file_name = dag_file_name_part + '.sql'
today = datetime.datetime.now()
substitutes = {
'base_table': raw_table_name,
'year': today.year,
'month': today.month,
'day': today.day,
'query_file': dag_sql_file_name,
'load_frequency': load_frequency,
'runtime_labels_dict': '', # A place holder for label key
'bq_location': bq_location
}
# Add bq_labels to substitutes dict if Telemetry allowed
# Converts dict to string for substitution purposes
if allow_telemetry:
substitutes['runtime_labels_dict'] = str(constants.CORTEX_JOB_LABEL)
# Create python DAG file.
generate_dag_py_file(_SQL_DAG_PYTHON_TEMPLATE, dag_py_file_name,
**substitutes)
# Create query for SQL script that will be used in the DAG.
fields = []
update_fields = []
raw_table_schema = client.get_table(raw_table_name).schema
for field in raw_table_schema:
if field.name not in _CDC_EXCLUDED_COLUMN_LIST:
fields.append(f'`{field.name}`')
update_fields.append((f'T.`{field.name}` = S.`{field.name}`'))
if not fields:
print(f'Schema could not be retrieved for {raw_table_name}')
primary_keys = get_primary_keys(raw_table_name)
if not primary_keys:
e_msg = f'Primary keys for {raw_table_name} not found in table DD03L'
raise cortex_exc.KeyCError(e_msg) from None
primary_key_join_list = []
primary_key_not_null_list = []
for key in primary_keys:
primary_key_join_list.append(f'T.`{key}` = S.`{key}`')
primary_key_not_null_list.append(f'`{key}` IS NOT NULL')
primary_keys_join_clause = ' AND '.join(primary_key_join_list)
primary_keys_not_null_clause = ' AND '.join(primary_key_not_null_list)
with open(_SQL_DAG_SQL_TEMPLATE, mode='r',
encoding='utf-8') as sql_template_file:
sql_template = Template(sql_template_file.read())
generated_sql = sql_template.substitute(
base_table=raw_table_name,
target_table=cdc_table_name,
primary_keys_join_clause=primary_keys_join_clause,
primary_keys_not_null_clause=primary_keys_not_null_clause,
fields=', '.join(fields),
update_fields=', '.join(update_fields),
primary_keys=', '.join(primary_keys))
# Create sql file containing the query
cdc_sql_file = _GENERATED_SQL_DIR + '/' + dag_sql_file_name
with open(cdc_sql_file, mode='w+', encoding='utf-8') as generated_sql_file:
generated_sql_file.write(generated_sql)
generated_sql_file.close()
print(f'Created DAG sql file {cdc_sql_file}')
# If test data is needed, we want to populate CDC tables as well
# from data in the RAW tables.
# Good thing is - we already have the sql query available to do that.
if gen_test.upper() == 'TRUE':
query_job = client.query(generated_sql)
# Let's wait for query to complete.
try:
_ = query_job.result()
except Exception as e: #pylint:disable=broad-except
print('Error in running DAG sql')
print(f'DAG sql: {generated_sql}')
print(f'Error: {str(e)}')
raise e
print(f'{cdc_table_name} table is populated with data '
f'from {raw_table_name} table')