src/datamigration/dags/redshift/redshift_data_load_dag.py [80:158]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    if "unique_id" not in user_config:
        user_config["unique_id"] = user_config["transfer_config"]["displayName"]

    ti.xcom_push(key="user_config", value=user_config)
    ti.xcom_push(
        key=VALIDATION_UNIQUE_ID_XCOM, value=user_config.get(VALIDATION_UNIQUE_ID_CONF)
    )
    ti.xcom_push(
        key=BATCH_DISTRIBUTION_XCOM, value=user_config.get(BATCH_DISTRIBUTION_CONF)
    )
    ti.xcom_push(key=TABLE_LIST_FILE_XCOM, value=user_config.get(TABLE_LIST_FILE_CONF))
    ti.xcom_push(key=CONFIG_FILE_BUCKET_XCOM, value=bucket_name)
    ti.xcom_push(key=CONFIG_FILE_OBJECT_XCOM, value=object_name)


def _get_table_list(ti):
    unique_id = ti.xcom_pull(key=VALIDATION_UNIQUE_ID_XCOM, task_ids=GET_CONFIG_TASK)
    table_list_file = ti.xcom_pull(key=TABLE_LIST_FILE_XCOM, task_ids=GET_CONFIG_TASK)

    if table_list_file:
        gcs = GCSHook()
        bucket, blob = gcs_util.parse_bucket_and_blob_from_path(table_list_file)
        file_list_blob = gcs.download_as_byte_array(
            bucket_name=bucket, object_name=blob
        )

        table_list = file_list_blob.decode("utf-8").splitlines()

    elif unique_id:
        results = bq_client.query(
            f"""
      SELECT DISTINCT
        SPLIT(source_table_name, '.')[OFFSET(1)] as source_table_name
      FROM
        `{PROJECT_ID}.{LOGS_DATASET}.{DVT_TABLE_NAME}`
      CROSS JOIN
        UNNEST(labels) AS a
      WHERE
        validation_type='Schema'
        AND a.value='{unique_id}'
      GROUP BY source_table_name, run_id
      HAVING
        SUM(IF(validation_status='fail', 1, 0)) = 0;
      """
        )

        table_list = [row["source_table_name"] for row in results]
    else:
        raise Exception(
            f"{VALIDATION_UNIQUE_ID_CONF} or {TABLE_LIST_FILE_CONF} must be present on config file"
        )

    ti.xcom_push(key=TABLE_LIST_XCOM, value=table_list)


def _generate_batches(ti):
    batch_distribution = ti.xcom_pull(
        key=BATCH_DISTRIBUTION_XCOM, task_ids=GET_CONFIG_TASK
    )
    table_list = ti.xcom_pull(key=TABLE_LIST_XCOM, task_ids=GET_TABLE_LIST_TASK)

    batch_table_names_list = []
    run_id_prefix = timezone.utcnow()
    for batch_run_id, batch in parallelization_utils.make_run_batches(
        table_list, batch_distribution, run_id_prefix
    ):
        batch_table_names = DTS_TABLE_NAME_SEPARATOR.join(batch)
        logging.info(f"run_id: {batch_run_id} ==> tables: {batch_table_names}")
        batch_table_names_list.append(batch_table_names)
    ti.xcom_push(key="batch_table_names_list", value=batch_table_names_list)
    return [{"batch_idx": i} for i in range(len(batch_table_names_list))]


def _create_bq_transfer_config_json(batch_idx, ti) -> None:
    """
    This function will create the JSON required to create transfer config on BigQuery
    :param ti: task instance parameter
    """
    user_config = ti.xcom_pull(key="user_config", task_ids=GET_CONFIG_TASK)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/datamigration/dags/teradata/teradata_data_load_dag.py [104:182]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        if "unique_id" not in user_config:
            user_config["unique_id"] = user_config["transfer_config"]["displayName"]

    ti.xcom_push(key="user_config", value=user_config)
    ti.xcom_push(
        key=VALIDATION_UNIQUE_ID_XCOM, value=user_config.get(VALIDATION_UNIQUE_ID_CONF)
    )
    ti.xcom_push(
        key=BATCH_DISTRIBUTION_XCOM, value=user_config.get(BATCH_DISTRIBUTION_CONF)
    )
    ti.xcom_push(key=TABLE_LIST_FILE_XCOM, value=user_config.get(TABLE_LIST_FILE_CONF))
    ti.xcom_push(key=CONFIG_FILE_BUCKET_XCOM, value=bucket_name)
    ti.xcom_push(key=CONFIG_FILE_OBJECT_XCOM, value=object_name)


def _get_table_list(ti):
    unique_id = ti.xcom_pull(key=VALIDATION_UNIQUE_ID_XCOM, task_ids=GET_CONFIG_TASK)
    table_list_file = ti.xcom_pull(key=TABLE_LIST_FILE_XCOM, task_ids=GET_CONFIG_TASK)

    if table_list_file:
        gcs = GCSHook()
        bucket, blob = gcs_util.parse_bucket_and_blob_from_path(table_list_file)
        file_list_blob = gcs.download_as_byte_array(
            bucket_name=bucket, object_name=blob
        )

        table_list = file_list_blob.decode("utf-8").splitlines()

    elif unique_id:
        results = bq_client.query(
            f"""
      SELECT DISTINCT
        SPLIT(source_table_name, '.')[OFFSET(1)] as source_table_name
      FROM
        `{PROJECT_ID}.{LOGS_DATASET}.{DVT_TABLE_NAME}`
      CROSS JOIN
        UNNEST(labels) AS a
      WHERE
        validation_type='Schema'
        AND a.value='{unique_id}'
      GROUP BY source_table_name, run_id
      HAVING
        SUM(IF(validation_status='fail', 1, 0)) = 0;
      """
        )

        table_list = [row["source_table_name"] for row in results]
    else:
        raise Exception(
            f"{VALIDATION_UNIQUE_ID_CONF} or {TABLE_LIST_FILE_CONF} must be present on config file"
        )

    ti.xcom_push(key=TABLE_LIST_XCOM, value=table_list)


def _generate_batches(ti):
    batch_distribution = ti.xcom_pull(
        key=BATCH_DISTRIBUTION_XCOM, task_ids=GET_CONFIG_TASK
    )
    table_list = ti.xcom_pull(key=TABLE_LIST_XCOM, task_ids=GET_TABLE_LIST_TASK)

    batch_table_names_list = []
    run_id_prefix = timezone.utcnow()
    for batch_run_id, batch in parallelization_utils.make_run_batches(
        table_list, batch_distribution, run_id_prefix
    ):
        batch_table_names = DTS_TABLE_NAME_SEPARATOR.join(batch)
        logging.info(f"run_id: {batch_run_id} ==> tables: {batch_table_names}")
        batch_table_names_list.append(batch_table_names)
    ti.xcom_push(key="batch_table_names_list", value=batch_table_names_list)
    return [{"batch_idx": i} for i in range(len(batch_table_names_list))]


def _create_bq_transfer_config_json(batch_idx, ti) -> None:
    """
    This function will create the JSON required to create transfer config on BigQuery
    :param ti: task instance parameter
    """
    user_config = ti.xcom_pull(key="user_config", task_ids=GET_CONFIG_TASK)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



