data-mesh-banking-labs/setup/resources/composer/dags/create_customer_data_exchange_tag.py [46:207]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
TAG_MAIN_CLASS=models.Variable.get('data_exchange_main_class')

#'https://dataplex.googleapis.com'


# --------------------------------------------------------------------------------
# Set default arguments
# --------------------------------------------------------------------------------


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_args = {
    'owner': 'airflow',
    'start_date': yesterday,
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
}

# --------------------------------------------------------------------------------
# Set task specific variables
# --------------------------------------------------------------------------------


#projects/mdm-dg/locations/us-central1/tagTemplates/data_product_data_quality"

# --------------------------------------------------------------------------------
# Set GCP logging
# --------------------------------------------------------------------------------

logger = logging.getLogger('customer_merchant_dp_info_tag')

def get_uuid():
    return str(uuid.uuid4())

def read_entities_list(table_list_file):
    """
    Reads the table list file that will help in creating Airflow tasks in
    the DAG dynamically.
    :param table_list_file: (String) The file location of the table list file,
    e.g. '/home/airflow/framework/table_list.csv'
    :return table_list: (List) List of tuples containing the source and
    target tables.
    """
    table_list = []
    logger.info('Reading table_list_file from : %s' % str(table_list_file))
    try:
        with io.open(table_list_file, 'rt', encoding='utf-8') as csv_file:
            csv_reader = csv.reader(csv_file)
            next(csv_reader)  # skip the headers
            for row in csv_reader:
                logger.info(row)
                table_tuple = {
                    'entity_name': row[0]
                }
                table_list.append(table_tuple)
            return table_list
    except IOError as e:
        logger.error('Error opening table_list_file %s: ' % str(
            table_list_file), e)

def read_entity_list(entity_list_file):
    """
    Reads the table list file that will help in creating Airflow tasks in
    the DAG dynamically.
    :param entity_list_file: (String) The file location of the table list file,
    e.g. '/home/airflow/framework/entity_list.csv'
    :return entity_list: (List) List of tuples containing the source and
    target tables.
    """
    entity_list = []
    logger.info('Reading entity_list_file from : %s' % str(entity_list_file))
    try:
        with io.open(entity_list_file, 'rt', encoding='utf-8') as file:

            for row in file:
                entity_list.append(row)
            return entity_list
    except IOError as e:
        logger.error('Error opening table_list_file %s: ' % str(
            entity_list_file), e)


def get_clouddq_task_status(task_id):
    """
    This method will return the job status for the task.
    Args:
    Returns: str
    """
    session, headers = get_session_headers()
    res = session.get(
        f"{DATAPLEX_ENDPOINT}/v1/projects/{PROJECT_ID_DG}/locations/{DATAPLEX_REGION}/lakes/{LAKE_ID}/tasks/{task_id}/jobs", headers=headers)
    print(res.status_code)
    print(res.text)
    resp_obj = json.loads(res.text)
    if res.status_code == 200:

        if (
            "jobs" in resp_obj
            and len(resp_obj["jobs"]) > 0
            and "state" in resp_obj["jobs"][0]
        ):
            task_status = resp_obj["jobs"][0]["state"]
            return task_status
    else:
        return "FAILED"


def get_session_headers():
    """
    This method is to get the session and headers object for authenticating the api requests using credentials.
    Args:
    Returns: tuple
    """
    # getting the credentials and project details for gcp project
    credentials, your_project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])

    # getting request object
    auth_req = google.auth.transport.requests.Request()

    print(credentials.valid)  # logger.debugs False
    credentials.refresh(auth_req)  # refresh token
    # cehck for valid credentials
    print(credentials.valid)  # logger.debugs True
    auth_token = credentials.token
    print(auth_token)

    headers = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
        'Authorization': 'Bearer ' + auth_token
    }

    with requests.Session() as session:
        session.auth = OAuth2BearerToken(auth_token)

    return (session, headers)


def _get_dataplex_job_state(**kwargs):
    """
    This method will try to get the status of the job till it is in either 'SUCCEEDED' or 'FAILED' state.
    Args:
    Returns: str
    """
    task_status = get_clouddq_task_status(kwargs['dplx_task_id'])
    while (task_status != 'SUCCEEDED' and task_status != 'FAILED'):
        print(time.ctime())
        time.sleep(30)
        task_status = get_clouddq_task_status(kwargs['dplx_task_id'])
        print(f"Cloud Dataplex task status is {task_status}")
    return task_status+ "_{}".format(kwargs['entity_val'])


with models.DAG(
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



data-mesh-banking-labs/setup/resources/composer/dags/create_merchants_data_exchange_tag.py [46:207]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
TAG_MAIN_CLASS=models.Variable.get('data_exchange_main_class')

#'https://dataplex.googleapis.com'


# --------------------------------------------------------------------------------
# Set default arguments
# --------------------------------------------------------------------------------


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_args = {
    'owner': 'airflow',
    'start_date': yesterday,
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
}

# --------------------------------------------------------------------------------
# Set task specific variables
# --------------------------------------------------------------------------------


#projects/mdm-dg/locations/us-central1/tagTemplates/data_product_data_quality"

# --------------------------------------------------------------------------------
# Set GCP logging
# --------------------------------------------------------------------------------

logger = logging.getLogger('customer_merchant_dp_info_tag')

def get_uuid():
    return str(uuid.uuid4())

def read_entities_list(table_list_file):
    """
    Reads the table list file that will help in creating Airflow tasks in
    the DAG dynamically.
    :param table_list_file: (String) The file location of the table list file,
    e.g. '/home/airflow/framework/table_list.csv'
    :return table_list: (List) List of tuples containing the source and
    target tables.
    """
    table_list = []
    logger.info('Reading table_list_file from : %s' % str(table_list_file))
    try:
        with io.open(table_list_file, 'rt', encoding='utf-8') as csv_file:
            csv_reader = csv.reader(csv_file)
            next(csv_reader)  # skip the headers
            for row in csv_reader:
                logger.info(row)
                table_tuple = {
                    'entity_name': row[0]
                }
                table_list.append(table_tuple)
            return table_list
    except IOError as e:
        logger.error('Error opening table_list_file %s: ' % str(
            table_list_file), e)

def read_entity_list(entity_list_file):
    """
    Reads the table list file that will help in creating Airflow tasks in
    the DAG dynamically.
    :param entity_list_file: (String) The file location of the table list file,
    e.g. '/home/airflow/framework/entity_list.csv'
    :return entity_list: (List) List of tuples containing the source and
    target tables.
    """
    entity_list = []
    logger.info('Reading entity_list_file from : %s' % str(entity_list_file))
    try:
        with io.open(entity_list_file, 'rt', encoding='utf-8') as file:

            for row in file:
                entity_list.append(row)
            return entity_list
    except IOError as e:
        logger.error('Error opening table_list_file %s: ' % str(
            entity_list_file), e)


def get_clouddq_task_status(task_id):
    """
    This method will return the job status for the task.
    Args:
    Returns: str
    """
    session, headers = get_session_headers()
    res = session.get(
        f"{DATAPLEX_ENDPOINT}/v1/projects/{PROJECT_ID_DG}/locations/{DATAPLEX_REGION}/lakes/{LAKE_ID}/tasks/{task_id}/jobs", headers=headers)
    print(res.status_code)
    print(res.text)
    resp_obj = json.loads(res.text)
    if res.status_code == 200:

        if (
            "jobs" in resp_obj
            and len(resp_obj["jobs"]) > 0
            and "state" in resp_obj["jobs"][0]
        ):
            task_status = resp_obj["jobs"][0]["state"]
            return task_status
    else:
        return "FAILED"


def get_session_headers():
    """
    This method is to get the session and headers object for authenticating the api requests using credentials.
    Args:
    Returns: tuple
    """
    # getting the credentials and project details for gcp project
    credentials, your_project_id = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"])

    # getting request object
    auth_req = google.auth.transport.requests.Request()

    print(credentials.valid)  # logger.debugs False
    credentials.refresh(auth_req)  # refresh token
    # cehck for valid credentials
    print(credentials.valid)  # logger.debugs True
    auth_token = credentials.token
    print(auth_token)

    headers = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
        'Authorization': 'Bearer ' + auth_token
    }

    with requests.Session() as session:
        session.auth = OAuth2BearerToken(auth_token)

    return (session, headers)


def _get_dataplex_job_state(**kwargs):
    """
    This method will try to get the status of the job till it is in either 'SUCCEEDED' or 'FAILED' state.
    Args:
    Returns: str
    """
    task_status = get_clouddq_task_status(kwargs['dplx_task_id'])
    while (task_status != 'SUCCEEDED' and task_status != 'FAILED'):
        print(time.ctime())
        time.sleep(30)
        task_status = get_clouddq_task_status(kwargs['dplx_task_id'])
        print(f"Cloud Dataplex task status is {task_status}")
    return task_status+ "_{}".format(kwargs['entity_val'])


with models.DAG(
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



