data-mesh-banking-labs/setup/resources/composer/dags/etl_transactions_analytics_process.py (53 lines of code) (raw):

import datetime from airflow import models from airflow.operators import bash from airflow.operators import email from airflow.providers.google.cloud.operators import bigquery from airflow.providers.google.cloud.transfers import bigquery_to_gcs from airflow.utils import trigger_rule from airflow.models.baseoperator import chain IMPERSONATION_CHAIN = models.Variable.get('gcp_transactions_consumer_sa_acct') REGION = models.Variable.get('gcp_project_region') PROJECT_ID_DW = models.Variable.get('gcp_dw_project') PROJECT_ID_DG = models.Variable.get('gcp_dg_project') partition_date = models.Variable.get('transactions_partition_date') CREATE_DP_CC_TRANS_ANALYTICS_DP = f""" CREATE TABLE IF NOT EXISTS `{PROJECT_ID_DW}.cc_analytics_data_product.credit_card_transaction_data` ( originating_event STRUCT<card_read_type STRUCT<code INT, entry_mode STRING>, trans_type STRUCT<trans_type INT64, value STRING>, payment_method STRUCT<pym_type_code INT64, pymt_name STRING>, swipe_type STRUCT<swipe_code INT64, swipe_value STRING>, trans_start_ts TIMESTAMP, trans_end_ts TIMESTAMP, amount STRUCT<trans_amount STRING, trans_currency STRING>, authorization_response STRUCT<trans_auth_code INT64, trans_auth_date FLOAT64, origination INT64, is_pin_entry INT64, is_signed INT64, is_unattended INT64>, event_ids STRING, event STRING>, card_info STRUCT<cc_token STRING, cc_number INT64, cc_expiry STRING, cc_provider STRING, cc_ccv INT64, cc_card_type STRING>, merchant_info STRUCT<merchant_id STRING, merchant_name STRING, mcc INT64, email STRING, street STRING, city STRING, state STRING, country STRING, zip STRING, latitude FLOAT64, longitude FLOAT64, owner_id STRING, owner_name STRING, terminal_ids STRING>, customer_info STRUCT<client_id STRING>, version INT64, ingest_date DATE ) PARTITION BY ingest_date; """ INSERT_DP_CC_TRANS_DATA = f""" INSERT INTO `{PROJECT_ID_DW}.cc_analytics_data_product.credit_card_transaction_data` (originating_event, card_info, merchant_info, customer_info, version, ingest_date ) SELECT STRUCT( STRUCT(card_read_type as code, entry_mode) AS card_read_type, STRUCT(trans_type, value ) AS trans_type, STRUCT(payment_method as pym_type_code, pymt_name ) AS payment_method, STRUCT(swipe_code, swipe_value ) AS swipe_type, trans_start_ts, trans_end_ts, STRUCT(trans_amount, trans_currency) AS amount, STRUCT(trans_auth_code, trans_auth_date, origination, is_pin_entry, is_signed, is_unattended) AS authorization_response, event_ids, event ) AS originating_event, STRUCT(auth.cc_token AS cc_token, cc_cust.cc_number AS cc_number, cc_cust.cc_expiry AS cc_expiry, cc_cust.cc_provider AS cc_provider, cc_cust.cc_ccv AS cc_ccv, cc_cust.cc_card_type AS cc_card_type ) AS card_info, STRUCT( merch.merchant_id AS merchant_id, merch.merchant_name AS merchant_name, merch.mcc AS mcc, merch.email AS email, merch.street AS street, merch.city AS city, merch.state AS state, merch.country AS country, merch.zip AS zip, merch.latitude AS lalitude, merch.longitude AS longitude, merch.owner_id AS owner_id, merch.owner_name AS owner_name, merch.terminal_ids AS terminal_ids ) AS merchant_info, STRUCT(client_id) AS customer_info, NULL AS version, auth.ingest_date as ingest_date FROM `{PROJECT_ID_DW}.auth_data_product.auth_table` auth LEFT OUTER JOIN `{PROJECT_ID_DW}.customer_data_product.cc_customer_data` cc_cust ON auth.cc_token=cc_cust.token LEFT OUTER JOIN `{PROJECT_ID_DW}.merchants_data_product.core_merchants` merch ON auth.merchant_id = merch.merchant_id where auth.ingest_date='{partition_date}'; """ yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) # [START composer_notify_failure] default_dag_args = { 'start_date': yesterday, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': datetime.timedelta(minutes=1), 'project_id': PROJECT_ID_DG, 'region': REGION, } with models.DAG( 'etl-transactions-analytics-process', catchup=False, schedule_interval=None, default_args=default_dag_args) as dag: bq_create_cc_analytics_dp_tbl = bigquery.BigQueryInsertJobOperator( task_id="bq_create_cc_analytics_dp_tbl", impersonation_chain=IMPERSONATION_CHAIN, configuration={ "query": { "query": CREATE_DP_CC_TRANS_ANALYTICS_DP, "useLegacySql": False } } ) bq_insert_cc_analytics_tbl = bigquery.BigQueryInsertJobOperator( task_id="bq_insert_cc_analytics_tbl", impersonation_chain=IMPERSONATION_CHAIN, configuration={ "query": { "query": INSERT_DP_CC_TRANS_DATA, "useLegacySql": False } } ) chain(bq_create_cc_analytics_dp_tbl >> bq_insert_cc_analytics_tbl)