src/datamigration/dags/redshift/redshift_transfer_run_log_dag.py [306:488]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                    continue

                # Done only to map source table_name with target table name
                # elif log_message.__contains__("data will be appended to existing table"):
                #     # For target table name
                #     target_summary_match = re.match(
                #         r"Table (.*) already exists in (.*);",
                #         log_message,
                #     )
                #     target_dataset_name = target_summary_match.group(2).split(".")[1]
                #     target_table_name = (
                #         target_dataset_name + "." + target_summary_match.group(1)
                #     )
                #     logging.info(target_table_name)
                #     continue

                # Done
                elif log_message.__contains__("Number of records"):
                    job_summary_match = re.match(
                        r"Job (.*) \(table (.*)\) .* records: (\d*),.* (\d*).",
                        log_message,
                    )
                    if job_summary_match:
                        table_name = job_summary_match.group(2)
                        if table_name not in job_stats_jsons:
                            job_stats_jsons[table_name] = job_stats_json_template.copy()
                        job_stats_jsons[table_name]["src_table_name"] = table_name
                        job_stats_jsons[table_name][
                            "bq_job_id"
                        ] = job_summary_match.group(1)
                        job_stats_jsons[table_name][
                            "success_records"
                        ] = job_summary_match.group(3)
                        job_stats_jsons[table_name][
                            "error_records"
                        ] = job_summary_match.group(4)
                    continue
                # Done
                elif log_message.__contains__("Summary:"):
                    run_summary_match = re.match(
                        r"^Summary: succeeded (\d*).*failed (\d*).*", log_message
                    )
                    dts_run_summary_json["succeeded_jobs"] = run_summary_match.group(1)
                    dts_run_summary_json["failed_jobs"] = run_summary_match.group(2)
                    continue

        except Exception as e:
            logging.error(
                f"Method _process_transfer_logs: failed to extract value from log \
                \n\tlog line: {log_message}\n\terror: {e}"
            )

    job_stats_json_rows = []
    for row_dict in job_stats_jsons.values():
        row_dict["run_date"] = run_date

        if not row_dict["message"]:
            row_dict["transfer_run_state"] = "SUCCEEDED"
        else:
            if row_dict["message"].__contains__("Skipping"):
                row_dict["transfer_run_state"] = "SKIPPED"
            else:
                row_dict["transfer_run_state"] = "FAILED"

        if row_dict["src_table_name"]:
            job_stats_json_rows.append(row_dict)

    logging.info(
        f"Method _process_transfer_logs: extracted {len(job_stats_json_rows)} transfer run job rows from logs"
    )

    ti.xcom_push(key="dts_run_summary_json", value=dts_run_summary_json)
    ti.xcom_push(key="job_stats_json_rows", value=job_stats_json_rows)


def _insert_bq_transfer_log_results(ti) -> None:
    """
    Inserts DTS run relevant information in bigquery dmt_logs tables
    """
    dts_run_summary_row = ti.xcom_pull(
        key="dts_run_summary_json", task_ids="process_transfer_logs"
    )
    job_stats_json_rows = ti.xcom_pull(
        key="job_stats_json_rows", task_ids="process_transfer_logs"
    )
    dts_log_file_path = ti.xcom_pull(
        key="dts_log_file_path", task_ids="create_transfer_log_GCS"
    )
    dts_run_summary_row["gcs_log_path"] = dts_log_file_path

    bq_utils.insert_bq_json_rows(BQ_TRANSFER_SUMMARY_TABLE_NAME, [dts_run_summary_row])
    if not job_stats_json_rows:
        logging.info(
            "Method _insert_bq_transfer_log_results: no job stats rows to populate. "
        )
    else:
        bq_utils.insert_bq_json_rows(
            BQ_TRANSFER_JOB_STATS_TABLE_NAME, job_stats_json_rows
        )


def _filter_tables(ti, **kwargs):
    data_transfer_config_json = ti.xcom_pull(
        task_ids="load_parameters", key="data_transfer_config_json"
    )
    translation_type = data_transfer_config_json["type"]
    validation_type = data_transfer_config_json["validation_config"]["validation_type"]
    validation_params_file_path = data_transfer_config_json["validation_config"][
        "validation_params_file_path"
    ]
    bucket_name, blob_name = gcs_util.parse_bucket_and_blob_from_path(
        validation_params_file_path
    )
    validation_params_from_gcs = gcs_util.get_validation_params_from_gcs(
        bucket_name, blob_name, translation_type, validation_type
    )

    config_table_mapping = []
    for source_table, source_table_params in validation_params_from_gcs.items():
        config_table_mapping.append(
            f"{source_table}={source_table_params['target-table']}"
        )

    src_schema = ast.literal_eval(kwargs["dag_run"].conf["config"])["SOURCE_SCHEMA"]
    logging.info(
        f"Method _filter_tables: received SOURCE_SCHEMA as {src_schema} from DAG config"
    )
    tables = ast.literal_eval(kwargs["dag_run"].conf["config"])["TABLE_NAME_PATTERN"]
    logging.info(
        f"Method _filter_tables: received TABLE_NAME_PATTERN as {tables} from DAG config"
    )
    valid_comparisons_list = table_filter.filter_valid_table_mappings(
        config_table_mapping, tables, src_schema
    )
    ti.xcom_push(key="dvt_table_list", value=valid_comparisons_list)


def _check_transfer_run_state(ti):
    run_state = ti.xcom_pull(task_ids="load_parameters", key="run_state")
    logging.info(f"TRANSFER_RUN_STATE: {run_state}")
    if run_state == "SUCCEEDED":
        return "filter_tables_for_dvt"
    else:
        logging.info("Skipping Validation")
        return "skip_validation_dag"


def _check_filtered_tables(ti):
    valid_comparisons_list = ti.xcom_pull(
        task_ids="filter_tables_for_dvt", key="dvt_table_list"
    )
    if not valid_comparisons_list:
        # if empty list, skip calling validation dag and call dummy task (to mark end of DAG's functionality)
        logging.info(
            "Method _check_filtered_tables: no valid dvt comparisons list found"
        )
        return "skip_validation_dag"
    else:
        # if non-empty list, call validation dag and skip invoking dummy task
        logging.info(
            f"Method _check_filtered_tables: valid dvt comparisons list - {valid_comparisons_list}"
        )
        return "determine_validation_dag"


def get_validation_dag_id(validation_mode):
    if validation_mode in VALIDATION_TYPE_TO_DAG_ID_MAPPING:
        return VALIDATION_TYPE_TO_DAG_ID_MAPPING[validation_mode]
    else:
        return VALIDATION_TYPE_TO_DAG_ID_MAPPING[VALIDATION_DEFAULT_TYPE]


def _determine_validation_dag(ti, **kwargs):
    config = ti.xcom_pull(task_ids="load_parameters", key="data_transfer_config_json")
    validation_mode = config["validation_config"].get("validation_mode")
    validation_dag_id = get_validation_dag_id(validation_mode)
    if validation_dag_id == VALIDATION_DAG_ID:
        return "invoke_validation_dag"
    else:
        return "invoke_validation_crun_dag"


with models.DAG(
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/datamigration/dags/teradata/teradata_transfer_run_log_dag.py [413:580]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                    continue

                elif log_message.__contains__("Number of records"):
                    job_summary_match = re.match(
                        r"Job (.*) \(table (.*)\) .* records: (\d*),.* (\d*).",
                        log_message,
                    )
                    if job_summary_match:
                        table_name = job_summary_match.group(2)
                        if table_name not in job_stats_jsons:
                            job_stats_jsons[table_name] = job_stats_json_template.copy()
                        job_stats_jsons[table_name]["src_table_name"] = table_name
                        job_stats_jsons[table_name][
                            "bq_job_id"
                        ] = job_summary_match.group(1)
                        job_stats_jsons[table_name][
                            "success_records"
                        ] = job_summary_match.group(3)
                        job_stats_jsons[table_name][
                            "error_records"
                        ] = job_summary_match.group(4)
                    continue

                elif log_message.__contains__("Summary:"):
                    run_summary_match = re.match(
                        r"^Summary: succeeded (\d*).*failed (\d*).*", log_message
                    )
                    dts_run_summary_json["succeeded_jobs"] = run_summary_match.group(1)
                    dts_run_summary_json["failed_jobs"] = run_summary_match.group(2)
                    continue

        except Exception as e:
            logging.error(
                f"Method _process_transfer_logs: failed to extract value from log \
                \n\tlog line: {log_message}\n\terror: {e}"
            )

    job_stats_json_rows = []
    for row_dict in job_stats_jsons.values():
        row_dict["run_date"] = run_date

        if not row_dict["message"]:
            row_dict["transfer_run_state"] = "SUCCEEDED"
        else:
            if row_dict["message"].__contains__("Skipping"):
                row_dict["transfer_run_state"] = "SKIPPED"
            else:
                row_dict["transfer_run_state"] = "FAILED"

        if row_dict["src_table_name"]:
            job_stats_json_rows.append(row_dict)

    logging.info(
        f"Method _process_transfer_logs: extracted {len(job_stats_json_rows)} transfer run job rows from logs"
    )

    ti.xcom_push(key="dts_run_summary_json", value=dts_run_summary_json)
    ti.xcom_push(key="job_stats_json_rows", value=job_stats_json_rows)


def _insert_bq_transfer_log_results(ti) -> None:
    """
    Inserts DTS run relevant information in bigquery dmt_logs tables
    """
    dts_run_summary_row = ti.xcom_pull(
        key="dts_run_summary_json", task_ids="process_transfer_logs"
    )
    job_stats_json_rows = ti.xcom_pull(
        key="job_stats_json_rows", task_ids="process_transfer_logs"
    )
    dts_log_file_path = ti.xcom_pull(
        key="dts_log_file_path", task_ids="create_transfer_log_GCS"
    )
    dts_run_summary_row["gcs_log_path"] = dts_log_file_path

    bq_utils.insert_bq_json_rows(BQ_TRANSFER_SUMMARY_TABLE_NAME, [dts_run_summary_row])
    if not job_stats_json_rows:
        logging.info(
            "Method _insert_bq_transfer_log_results: no job stats rows to populate. "
        )
    else:
        bq_utils.insert_bq_json_rows(
            BQ_TRANSFER_JOB_STATS_TABLE_NAME, job_stats_json_rows
        )


def _filter_tables(ti, **kwargs):
    data_transfer_config_json = ti.xcom_pull(
        task_ids="load_parameters", key="data_transfer_config_json"
    )
    translation_type = data_transfer_config_json["type"]
    validation_type = data_transfer_config_json["validation_config"]["validation_type"]
    validation_params_file_path = data_transfer_config_json["validation_config"][
        "validation_params_file_path"
    ]
    bucket_name, blob_name = gcs_util.parse_bucket_and_blob_from_path(
        validation_params_file_path
    )
    validation_params_from_gcs = gcs_util.get_validation_params_from_gcs(
        bucket_name, blob_name, translation_type, validation_type
    )

    config_table_mapping = []
    for source_table, source_table_params in validation_params_from_gcs.items():
        config_table_mapping.append(
            f"{source_table}={source_table_params['target-table']}"
        )

    src_schema = ast.literal_eval(kwargs["dag_run"].conf["config"])["SOURCE_SCHEMA"]
    logging.info(
        f"Method _filter_tables: received SOURCE_SCHEMA as {src_schema} from DAG config"
    )
    tables = ast.literal_eval(kwargs["dag_run"].conf["config"])["TABLE_NAME_PATTERN"]
    logging.info(
        f"Method _filter_tables: received TABLE_NAME_PATTERN as {tables} from DAG config"
    )
    valid_comparisons_list = table_filter.filter_valid_table_mappings(
        config_table_mapping, tables, src_schema
    )
    ti.xcom_push(key="dvt_table_list", value=valid_comparisons_list)


def _check_transfer_run_state(ti):
    run_state = ti.xcom_pull(task_ids="load_parameters", key="run_state")
    logging.info(f"TRANSFER_RUN_STATE: {run_state}")
    if run_state == "SUCCEEDED":
        return "filter_tables_for_dvt"
    else:
        logging.info("Skipping Validation")
        return "skip_validation_dag"


def _check_filtered_tables(ti):
    valid_comparisons_list = ti.xcom_pull(
        task_ids="filter_tables_for_dvt", key="dvt_table_list"
    )
    if not valid_comparisons_list:
        # if empty list, skip calling validation dag and call dummy task (to mark end of DAG's functionality)
        logging.info(
            "Method _check_filtered_tables: no valid dvt comparisons list found"
        )
        return "skip_validation_dag"
    else:
        # if non-empty list, determine validation dag to be called and skip invoking dummy task
        logging.info(
            f"Method _check_filtered_tables: valid dvt comparisons list - {valid_comparisons_list}"
        )
        return "determine_validation_dag"


def get_validation_dag_id(validation_mode):
    if validation_mode in VALIDATION_TYPE_TO_DAG_ID_MAPPING:
        return VALIDATION_TYPE_TO_DAG_ID_MAPPING[validation_mode]
    else:
        return VALIDATION_TYPE_TO_DAG_ID_MAPPING[VALIDATION_DEFAULT_TYPE]


def _determine_validation_dag(ti, **kwargs):
    config = ti.xcom_pull(task_ids="load_parameters", key="data_transfer_config_json")
    validation_mode = config["validation_config"].get("validation_mode")
    validation_dag_id = get_validation_dag_id(validation_mode)
    if validation_dag_id == VALIDATION_DAG_ID:
        return "invoke_validation_dag"
    else:
        return "invoke_validation_crun_dag"


with models.DAG(
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



