src/datamigration/dags/redshift/redshift_transfer_run_log_dag.py [185:280]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        )
        logging.info(
            f"Method get_transfer_run_logs page {page_iteration}: request sent to fetch transfer logs for \
            transfer_run_id {full_run_id}"
        )
        logs = response["transferMessages"]
        transfer_logs.extend(logs)
        page_token = response.get("nextPageToken", None)
        return get_transfer_run_logs(
            full_run_id,
            next_page_token=page_token,
            page_size=page_size,
            page_iteration=page_iteration + 1,
            transfer_logs=transfer_logs,
        )
    except HttpError as err:
        logging.error("Error requesting transfer run log")
        logging.exception(err)
        raise err


def _get_transfer_run_logs(ti) -> None:
    """
    Calls relevant function which calls DTS API to get transfer run logs
    """
    full_run_id = ti.xcom_pull(key="full_run_id", task_ids="load_parameters")
    transfer_run_id = ti.xcom_pull(key="transfer_run_id", task_ids="load_parameters")
    transfer_run_logs = get_transfer_run_logs(full_run_id, page_size=10)
    if not transfer_run_logs:
        raise Exception(
            f"Method _get_transfer_run_logs: failed to retrieve transfer run logs for \
            transfer_run_id {transfer_run_id}"
        )
    ti.xcom_push(key="dts_run_logs", value=transfer_run_logs)


def get_logging_bucket_name():
    bucket_id = LOGGING_BUCKET_ID_PREFIX
    cust_name = os.environ.get("CUSTOMER_NAME")
    bucket_id = f"{bucket_id}-{cust_name}"
    logging.info(
        f"Method get_logging_bucket_name: retrieved logging bucket name as {bucket_id}"
    )
    return bucket_id


def _create_transfer_log_GCS(ti) -> None:
    """
    Creates DTS run log file in cloud storage bucket
    """
    bucket_id = get_logging_bucket_name()
    unique_id = ti.xcom_pull(key="unique_id", task_ids="load_parameters")
    transfer_run_id = ti.xcom_pull(key="transfer_run_id", task_ids="load_parameters")
    dts_run_logs = ti.xcom_pull(key="dts_run_logs", task_ids="get_transfer_run_logs")
    log_file_name = f"{unique_id}/{transfer_run_id}.json"
    formatted_logs = "\n".join(json.dumps(log) for log in dts_run_logs)
    log_file_path = gcs_util.write_object_in_gcsbucket(
        bucket_id, log_file_name, formatted_logs
    )
    logging.info(
        f"Method _create_transfer_log_GCS: written transfer run logs in GCS path: {log_file_path}"
    )
    ti.xcom_push(key="dts_log_file_path", value=log_file_path)


def _process_transfer_logs(ti) -> None:
    """
    Processes DTS logs and extracts relevant information for tables' migration
    """
    transfer_config_id = ti.xcom_pull(
        key="transfer_config_id", task_ids="load_parameters"
    )
    transfer_run_id = ti.xcom_pull(key="transfer_run_id", task_ids="load_parameters")
    unique_id = ti.xcom_pull(key="unique_id", task_ids="load_parameters")
    dts_run_summary_json = ti.xcom_pull(
        key="dts_run_summary_json", task_ids="get_transfer_run_summary"
    )
    job_stats_json_template = ti.xcom_pull(
        key="job_stats_json", task_ids="load_parameters"
    )
    dts_run_logs = ti.xcom_pull(key="dts_run_logs", task_ids="get_transfer_run_logs")

    job_stats_json_template["unique_id"] = unique_id
    job_stats_json_template["transfer_run_id"] = transfer_run_id
    job_stats_json_template["transfer_config_id"] = transfer_config_id

    job_stats_jsons = {}

    # TODO: add parsing logic for empty table migration
    # TODO: shorten error messages before inserting to bigquery
    for log_dict in dts_run_logs:
        log_message = log_dict["messageText"]
        try:
            # Parsing error messages
            if log_dict["severity"] == "ERROR":
                job_stats_json_template["job_status"] = "FAILED"
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/datamigration/dags/teradata/teradata_transfer_run_log_dag.py [216:309]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        )
        logging.info(
            f"Method get_transfer_run_logs page {page_iteration}: request sent to fetch transfer logs for \
            transfer_run_id {full_run_id}"
        )
        logs = response["transferMessages"]
        transfer_logs.extend(logs)
        page_token = response.get("nextPageToken", None)
        return get_transfer_run_logs(
            full_run_id,
            next_page_token=page_token,
            page_size=page_size,
            page_iteration=page_iteration + 1,
            transfer_logs=transfer_logs,
        )
    except HttpError as err:
        logging.error("Error requesting transfer run log")
        logging.exception(err)
        raise err


def _get_transfer_run_logs(ti) -> None:
    """
    Calls relevant function which calls DTS API to get transfer run logs
    """
    full_run_id = ti.xcom_pull(key="full_run_id", task_ids="load_parameters")
    transfer_run_id = ti.xcom_pull(key="transfer_run_id", task_ids="load_parameters")
    transfer_run_logs = get_transfer_run_logs(full_run_id, page_size=10)
    if not transfer_run_logs:
        raise Exception(
            f"Method _get_transfer_run_logs: failed to retrieve transfer run logs for \
            transfer_run_id {transfer_run_id}"
        )
    ti.xcom_push(key="dts_run_logs", value=transfer_run_logs)


def get_logging_bucket_name():
    bucket_id = LOGGING_BUCKET_ID_PREFIX
    cust_name = os.environ.get("CUSTOMER_NAME")
    bucket_id = f"{bucket_id}-{cust_name}"
    logging.info(
        f"Method get_logging_bucket_name: retrieved logging bucket name as {bucket_id}"
    )
    return bucket_id


def _create_transfer_log_GCS(ti) -> None:
    """
    Creates DTS run log file in cloud storage bucket
    """
    bucket_id = get_logging_bucket_name()
    unique_id = ti.xcom_pull(key="unique_id", task_ids="load_parameters")
    transfer_run_id = ti.xcom_pull(key="transfer_run_id", task_ids="load_parameters")
    dts_run_logs = ti.xcom_pull(key="dts_run_logs", task_ids="get_transfer_run_logs")
    log_file_name = f"{unique_id}/{transfer_run_id}.json"
    formatted_logs = "\n".join(json.dumps(log) for log in dts_run_logs)
    log_file_path = gcs_util.write_object_in_gcsbucket(
        bucket_id, log_file_name, formatted_logs
    )
    logging.info(
        f"Method _create_transfer_log_GCS: written transfer run logs in GCS path: {log_file_path}"
    )
    ti.xcom_push(key="dts_log_file_path", value=log_file_path)


def _process_transfer_logs(ti) -> None:
    """
    Processes DTS logs and extracts relevant information for tables' migration
    """
    transfer_config_id = ti.xcom_pull(
        key="transfer_config_id", task_ids="load_parameters"
    )
    transfer_run_id = ti.xcom_pull(key="transfer_run_id", task_ids="load_parameters")
    unique_id = ti.xcom_pull(key="unique_id", task_ids="load_parameters")
    dts_run_summary_json = ti.xcom_pull(
        key="dts_run_summary_json", task_ids="get_transfer_run_summary"
    )
    job_stats_json_template = ti.xcom_pull(
        key="job_stats_json", task_ids="load_parameters"
    )
    dts_run_logs = ti.xcom_pull(key="dts_run_logs", task_ids="get_transfer_run_logs")

    job_stats_json_template["unique_id"] = unique_id
    job_stats_json_template["transfer_run_id"] = transfer_run_id
    job_stats_json_template["transfer_config_id"] = transfer_config_id

    job_stats_jsons = {}

    for log_dict in dts_run_logs:
        log_message = log_dict["messageText"]
        try:
            # Parsing error messages
            if log_dict["severity"] == "ERROR":
                job_stats_json_template["job_status"] = "FAILED"
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



