cloud-composer-etl/dags/from_data_lake_to_data_warehouse.py (88 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ This example shows how to trigger a Datalake dag, and then load the data into BigQuery. """ import os from datetime import datetime from airflow import models from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator,BigQueryInsertJobOperator CONN_ID = "pgCitibike" DATASET_NAME = "citibike" GCS_DATA_LAKE_BUCKET = os.environ.get("GCS_DATA_LAKE_BUCKET") PROJECT_ID = os.environ.get("GCP_PROJECT") with models.DAG( dag_id='from_data_lake_to_data_warehouse', description='Import data from the data lake to the data warehouse in BigQuery', schedule_interval="@once", start_date=datetime(2022, 1, 1), catchup=False, tags=["example", "bigquery"], ) as dag: trigger_datalake_dag = TriggerDagRunOperator( task_id="trigger_data_lake_dag", trigger_dag_id="from_database_to_data_lake", wait_for_completion=True, poke_interval=10, # seconds execution_date="{{ execution_date }}" ) create_dataset = BigQueryCreateEmptyDatasetOperator( task_id="create_dataset", dataset_id=DATASET_NAME, location="US" ) load_stations = GCSToBigQueryOperator( task_id='bq_load_stations', bucket=GCS_DATA_LAKE_BUCKET, source_objects=["citibike/stations/dt={{ ds }}/records.csv"], skip_leading_rows=1, destination_project_dataset_table="{}.{}".format( DATASET_NAME, "stations"), autodetect=True, write_disposition='WRITE_TRUNCATE', ) load_trips = GCSToBigQueryOperator( task_id='bq_load_trips', bucket=GCS_DATA_LAKE_BUCKET, source_objects=["citibike/trips/dt={{ ds }}/records.csv"], skip_leading_rows=1, destination_project_dataset_table="{}.{}".format( DATASET_NAME, "trips"), autodetect=True, write_disposition='WRITE_TRUNCATE', ) create_bike_trips_table = BigQueryInsertJobOperator( task_id="create_bike_trips_table", configuration={ "query": { "query": '''SELECT bikeid, COUNT(*) AS trip_count, MAX(starttime) AS last_start, MAX(stoptime) AS last_stop, ARRAY_AGG( STRUCT( ss.name AS start_station_name, t.starttime AS start_time, es.name AS end_station_name, t.stoptime AS end_time, tripduration AS trip_duration) ) AS trips FROM `{0}.citibike.trips` t JOIN `{0}.citibike.stations` AS ss ON (t.start_station_id = ss.station_id) JOIN `{0}.citibike.stations` AS es ON (t.end_station_id = es.station_id) GROUP BY bikeid '''.format(PROJECT_ID), "useLegacySql": False, "destinationTable": { "projectId": PROJECT_ID, "datasetId": "citibike", "tableId": "bike_trips", }, "createDisposition": "CREATE_IF_NEEDED", "writeDisposition": "WRITE_TRUNCATE", } }, ) # task dependency trigger_datalake_dag >> create_dataset create_dataset >> [load_stations, load_trips] [load_stations, load_trips] >> create_bike_trips_table