airflow/dags_template/generic-api-service.py (213 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import uuid
import datetime
import json
import logging
import pendulum
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync
from airflow import DAG
# from airflow.providers.google.cloud.operators.dataform import DataformRunOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryCreateEmptyTableOperator, BigQueryDeleteTableOperator, BigQueryUpdateTableOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.sensors.tasks import TaskQueueEmptySensor
from airflow.providers.google.cloud.operators.tasks import CloudTasksQueueCreateOperator, CloudTasksQueueDeleteOperator
from airflow.operators.dummy import DummyOperator
from google.cloud.tasks_v2.types import Queue
logger = logging.getLogger(__name__)
LOD_PRJ = os.environ.get('LOD_PRJ')
BQ_LOCATION = os.environ.get('BQ_LOCATION')
LOD_SA = os.environ.get('LOD_SA')
LOD_BQ_DATASET = os.environ.get('LOD_BQ_DATASET')
REGION = os.environ.get("GCP_REGION")
LOD_GCS_STAGING = os.environ.get("LOD_GCS_STAGING")
workflow_config = json.loads("""${WORKFLOW_CONFIG}""")
config_dag_paramns = workflow_config.get('airflow_dag_config')
config_dag_paramns['default_args'] = {'owner': 'airflow'}
config_dag_paramns.setdefault('dag_id', "api_workflow_${WORKFLOW_KEY}")
config_dag_paramns.setdefault('start_date', datetime.datetime.now() - datetime.timedelta(minutes=1))
config_dag_paramns.setdefault('dag_id','api_workflow_'+workflow_config.get('name'))
api_config = workflow_config.get('api_config')
gcs_to_bigquery = workflow_config.get('airflow_gcs_to_bigquery_config')
gcs_to_bigquery.pop('destination_project_dataset_table',None)
gcs_to_bigquery.setdefault('task_id','gcs_to_bigquery_execute')
gcs_to_bigquery.setdefault('bucket', LOD_GCS_STAGING)
gcs_to_bigquery['bucket'] = gcs_to_bigquery['bucket'].replace('gs://','')
gcs_to_bigquery.setdefault('impersonation_chain', LOD_SA)
workflow_id = workflow_config.get('name')
api_config = workflow_config.get('api_config')
request_config = api_config.get('request_config',{})
static_headers = request_config.get('static_data',{}).get('headers', {})
static_query_string = request_config.get('static_data',{}).get('query_string', {})
static_body = request_config.get('static_data',{}).get('body', {})
dynamic_headers = request_config.get('dynamic_data',{}).get('headers', {})
dynamic_query_string = request_config.get('dynamic_data',{}).get('query_string', {})
dynamic_body = request_config.get('dynamic_data',{}).get('body', {})
request_config.pop('dynamic_data',None)
request_config.pop('static_data',None)
def get_expiration_time(seconds=84600):
expiration_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
seconds=seconds
)
return int(expiration_time.timestamp() * 1000)
def run_bq_job(**kwargs):
task_instance = kwargs['ti']
def _json_pairs(alias, dynamic_query_string, static_query_string):
def _to_query_pairs(items, remove_quotes=False):
q = '`' if remove_quotes else '"'
return [f'\'"{key}":"\', {q}{value}{q},\'"\'' for key, value in items.items()]
query_pairs = _to_query_pairs(dynamic_query_string, True)
query_pairs.extend(_to_query_pairs(static_query_string))
json_pairs = ", ' , ',".join(query_pairs)
return "CONCAT('{',"+json_pairs+",'}') AS "+alias
secret_name = api_config.get('auth').get('secret_name','none')
sql_query_secret = '"{}" as secret_name'.format(secret_name)
sql_query = _json_pairs('query_string', dynamic_query_string, static_query_string)
sql_body = _json_pairs('body',dynamic_body, static_body)
sql_headers = _json_pairs('headers',dynamic_headers, static_headers)
sql_query_source = ', '.join([sql_query_secret, sql_query, sql_body, sql_headers])
sql = """
INSERT INTO `{dataset}.{tmp_log_table}` (
headers,
query_string,
body,
result,
exec_time
)
WITH query_source AS (
SELECT
'{workflow_id}' AS workflow_id,
'{auth}' AS auth,
'{request_config}' AS request_config,
{sql_query_source}
FROM `{source_table}`
)
SELECT
headers,
query_string,
body,
TO_JSON_STRING(
`{dataset}.routine_execute_api_fnc`(
workflow_id,
request_config,
auth,
headers,
query_string,
body,
'{dataset}.{tmp_result_table}',
'{queue_name}'
)
) AS result,
CURRENT_TIMESTAMP() AS exec_time
FROM query_source
""".format(dataset=LOD_BQ_DATASET,
tmp_log_table= task_instance.xcom_pull(task_ids="tmp_log_table", key="bigquery_table")["table_id"],
workflow_id=workflow_id,
tmp_result_table=task_instance.xcom_pull(task_ids="tmp_log_table", key="bigquery_table")["table_id"],
source_table=f"{LOD_PRJ}.{LOD_BQ_DATASET}.lod_ingestion_data_"+task_instance.xcom_pull(task_ids="uuid"),
queue_name=task_instance.xcom_pull(task_ids="create_queue_gct", key="return_value")["name"],
auth=json.dumps(api_config.get('auth'),separators=(',', ':')),
request_config=json.dumps(api_config.get('request_config'),separators=(',', ':')),
sql_query_source=sql_query_source)
logger.info("generated sql: "+sql)
start_run_job_in_bq = BigQueryInsertJobOperator(
task_id='bq_execute_api_fnc',
gcp_conn_id='bigquery_default',
project_id=LOD_PRJ,
location=BQ_LOCATION,
configuration={
'jobType':'QUERY',
'query':{
"priority": "BATCH",
'query':sql,
"useLegacySql": False
}
},
impersonation_chain=[LOD_SA]
)
start_run_job_in_bq.execute(kwargs)
# Start Declare DAG
with DAG(
**config_dag_paramns
) as dag:
start = DummyOperator(
task_id='start',
trigger_rule='all_success'
)
end = DummyOperator(
task_id='end',
trigger_rule='all_success'
)
#https://cloud.google.com/tasks/docs/creating-queues?hl=en#create_a_queue
#It can take a few minutes for a newly created queue to be available. We will wait 60 secs
wait60secs = TimeDeltaSensor(task_id="wait_queue_tobe_available", delta=pendulum.duration(seconds=60))
generate_uuid = PythonOperator(
task_id='uuid',
python_callable=lambda: datetime.datetime.now().strftime("%Y%m%d%H%M%S")
)
gcs_to_bigquery_execute = GCSToBigQueryOperator(
destination_project_dataset_table=f"{LOD_PRJ}.{LOD_BQ_DATASET}.lod_ingestion_data_"+'{{ task_instance.xcom_pull(task_ids="uuid") }}',
**gcs_to_bigquery
)
gcs_to_bigquery_table_expiration = BigQueryUpdateTableOperator(
task_id="lod_ingestion_data_set_table_exp",
project_id=LOD_PRJ,
dataset_id=LOD_BQ_DATASET,
table_id='lod_ingestion_data_{{ task_instance.xcom_pull(task_ids="uuid") }}',
fields=["expirationTime"],
table_resource={
"expirationTime": get_expiration_time(),
},
impersonation_chain=[LOD_SA]
)
create_tmp_log_table = BigQueryCreateEmptyTableOperator(
task_id='tmp_log_table',
project_id=LOD_PRJ,
dataset_id=LOD_BQ_DATASET,
table_id='tbl_process_log_{{ task_instance.xcom_pull(task_ids="uuid") }}',
location='US',
exists_ok=False,
table_resource={
"expirationTime": get_expiration_time(),
"schema": {"fields":[
{'name': 'headers', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'query_string', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'body', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'result', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'exec_time', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'}
]}
},
gcp_conn_id='bigquery_default',
impersonation_chain=[LOD_SA],
)
run_bq_task = PythonOperator(
task_id='run_bq_task',
python_callable=run_bq_job,
provide_context=True
)
# Task 1: Create empty table to store result
create_result_tmp_final_table = BigQueryCreateEmptyTableOperator(
task_id='tmp_result_table',
project_id=LOD_PRJ,
dataset_id=LOD_BQ_DATASET,
table_id='tbl_result_{{ task_instance.xcom_pull(task_ids="uuid") }}',
location='US',
table_resource={
"expirationTime": get_expiration_time(),
"schema": {"fields":[
{'name': 'request', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
{'name': 'uri', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'method', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'auth_type', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'query_string', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'body', 'type': 'STRING', 'mode': 'NULLABLE'}
]},
{'name': 'request_time', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},
{'name': 'elapsed_time', 'type': 'FLOAT', 'mode': 'NULLABLE'},
{'name': 'response', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
{'name': 'status_code', 'type': 'INTEGER', 'mode': 'NULLABLE'},
{'name': 'headers', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'body', 'type': 'STRING', 'mode': 'NULLABLE'}
]}
]
}
},
gcp_conn_id='bigquery_default',
impersonation_chain=[LOD_SA],
)
create_queue = CloudTasksQueueCreateOperator(
location=REGION,
project_id=LOD_PRJ,
task_queue=Queue(),
queue_name="cloud-task-api-{{ task_instance.xcom_pull(task_ids='uuid') }}",
task_id="create_queue_gct",
impersonation_chain=[LOD_SA],
)
delete_queue = CloudTasksQueueDeleteOperator(
location=REGION,
project_id=LOD_PRJ,
queue_name="cloud-task-api-{{ task_instance.xcom_pull(task_ids='uuid') }}",
task_id="delete_queue_gct",
impersonation_chain=[LOD_SA],
trigger_rule='all_done'
)
# Use TaskQueueEmptySensor to wait until the Cloud Tasks queue is empty
wait_for_empty_queue = TaskQueueEmptySensor(
task_id='wait_for_queue_empty',
project_id=LOD_PRJ,
location=REGION,
queue_name="{{ task_instance.xcom_pull(task_ids='create_queue_gct', key='cloud_task_queue')['queue_id'] }}",
poke_interval=60, # Check every 60 seconds
timeout=3600*6, # Timeout after 6 hour- max time to wait BigQuery
mode='poke', # Block the task until the queue is empty
impersonation_chain=[LOD_SA]
)
start >> \
generate_uuid >> \
[gcs_to_bigquery_execute, create_result_tmp_final_table,create_tmp_log_table,create_queue] >> \
wait60secs >> \
run_bq_task >> \
wait_for_empty_queue >> \
delete_queue >> \
end
#[delete_queue,gcs_to_bigquery_table_expiration] >> \
# [delete_queue,delete_result_tmp_final_table] >> \