def _filter_tables()

in src/datamigration/dags/redshift/redshift_transfer_run_log_dag.py [0:0]


def _filter_tables(ti, **kwargs):
    data_transfer_config_json = ti.xcom_pull(
        task_ids="load_parameters", key="data_transfer_config_json"
    )
    translation_type = data_transfer_config_json["type"]
    validation_type = data_transfer_config_json["validation_config"]["validation_type"]
    validation_params_file_path = data_transfer_config_json["validation_config"][
        "validation_params_file_path"
    ]
    bucket_name, blob_name = gcs_util.parse_bucket_and_blob_from_path(
        validation_params_file_path
    )
    validation_params_from_gcs = gcs_util.get_validation_params_from_gcs(
        bucket_name, blob_name, translation_type, validation_type
    )

    config_table_mapping = []
    for source_table, source_table_params in validation_params_from_gcs.items():
        config_table_mapping.append(
            f"{source_table}={source_table_params['target-table']}"
        )

    src_schema = ast.literal_eval(kwargs["dag_run"].conf["config"])["SOURCE_SCHEMA"]
    logging.info(
        f"Method _filter_tables: received SOURCE_SCHEMA as {src_schema} from DAG config"
    )
    tables = ast.literal_eval(kwargs["dag_run"].conf["config"])["TABLE_NAME_PATTERN"]
    logging.info(
        f"Method _filter_tables: received TABLE_NAME_PATTERN as {tables} from DAG config"
    )
    valid_comparisons_list = table_filter.filter_valid_table_mappings(
        config_table_mapping, tables, src_schema
    )
    ti.xcom_push(key="dvt_table_list", value=valid_comparisons_list)