in src/datamigration/dags/teradata/teradata_transfer_run_log_dag.py [0:0]
def _filter_tables(ti, **kwargs):
data_transfer_config_json = ti.xcom_pull(
task_ids="load_parameters", key="data_transfer_config_json"
)
translation_type = data_transfer_config_json["type"]
validation_type = data_transfer_config_json["validation_config"]["validation_type"]
validation_params_file_path = data_transfer_config_json["validation_config"][
"validation_params_file_path"
]
bucket_name, blob_name = gcs_util.parse_bucket_and_blob_from_path(
validation_params_file_path
)
validation_params_from_gcs = gcs_util.get_validation_params_from_gcs(
bucket_name, blob_name, translation_type, validation_type
)
config_table_mapping = []
for source_table, source_table_params in validation_params_from_gcs.items():
config_table_mapping.append(
f"{source_table}={source_table_params['target-table']}"
)
src_schema = ast.literal_eval(kwargs["dag_run"].conf["config"])["SOURCE_SCHEMA"]
logging.info(
f"Method _filter_tables: received SOURCE_SCHEMA as {src_schema} from DAG config"
)
tables = ast.literal_eval(kwargs["dag_run"].conf["config"])["TABLE_NAME_PATTERN"]
logging.info(
f"Method _filter_tables: received TABLE_NAME_PATTERN as {tables} from DAG config"
)
valid_comparisons_list = table_filter.filter_valid_table_mappings(
config_table_mapping, tables, src_schema
)
ti.xcom_push(key="dvt_table_list", value=valid_comparisons_list)