dataplex-quickstart-labs/00-resources/scripts/airflow/chicago-crimes-analytics/bq_lineage_pipeline.py (71 lines of code) (raw):
from airflow import models
from airflow.operators import dummy_operator
from airflow.providers.google.cloud.operators import bigquery
from airflow.utils import trigger_rule
from datetime import datetime
from airflow.utils.dates import days_ago
PROJECT_ID = models.Variable.get('project_id')
CURATE_CHICAGO_CRIMES = f"""
CREATE OR REPLACE TABLE
`{PROJECT_ID}.oda_curated_zone.crimes_curated` AS
SELECT
*,
CAST(year AS Integer) AS case_year,
FORMAT_DATE('%B',date) AS case_month,
FORMAT_DATE('%d',date) AS case_day_of_month,
FORMAT_DATE('%k',date) AS case_hour,
EXTRACT(DAYOFWEEK FROM date) AS case_day_of_week_nbr,
FORMAT_DATE('%A',date) AS case_day_of_week_name
FROM
oda_raw_zone.crimes_raw;
"""
TREND_BY_YEAR = f"""
CREATE OR REPLACE TABLE
`{PROJECT_ID}.oda_product_zone.crimes_by_year` AS
SELECT
case_year,
COUNT(*) AS crime_count
FROM
oda_curated_zone.crimes_curated
GROUP BY
case_year;
"""
TREND_BY_MONTH = f"""
CREATE OR REPLACE TABLE
`{PROJECT_ID}.oda_product_zone.crimes_by_month` AS
SELECT
case_month AS month,
COUNT(*) AS crime_count
FROM
oda_curated_zone.crimes_curated
GROUP BY
case_month;
"""
TREND_BY_DAY = f"""
CREATE OR REPLACE TABLE
`{PROJECT_ID}.oda_product_zone.crimes_by_day` AS
SELECT
case_day_of_week_name AS day,
COUNT(*) AS crime_count
FROM
oda_curated_zone.crimes_curated
GROUP BY
case_day_of_week_name;
"""
TREND_BY_HOUR = f"""
CREATE OR REPLACE TABLE
`{PROJECT_ID}.oda_product_zone.crimes_by_hour` AS
SELECT
case_hour AS hour_of_day,
COUNT(*) AS crime_count
FROM
oda_curated_zone.crimes_curated
GROUP BY
case_hour;
"""
with models.DAG(
'Chicago_Crime_Trends_From_BQ_With_OOB_Lineage',
schedule_interval=None,
start_date = days_ago(2),
catchup=False) as dag:
start = dummy_operator.DummyOperator(
task_id='start',
trigger_rule='all_success'
)
end = dummy_operator.DummyOperator(
task_id='end',
trigger_rule='all_done'
)
curate_chicago_crimes = bigquery.BigQueryInsertJobOperator(
task_id="Curate_Chicago_Crimes",
configuration={
"query": {
"query": CURATE_CHICAGO_CRIMES,
"useLegacySql": False
}
}
)
trend_by_year = bigquery.BigQueryInsertJobOperator(
task_id="Trend_By_Year",
configuration={
"query": {
"query": TREND_BY_YEAR,
"useLegacySql": False
}
}
)
trend_by_month = bigquery.BigQueryInsertJobOperator(
task_id="Trend_By_Month",
configuration={
"query": {
"query": TREND_BY_MONTH,
"useLegacySql": False
}
}
)
trend_by_day = bigquery.BigQueryInsertJobOperator(
task_id="Trend_By_Day",
configuration={
"query": {
"query": TREND_BY_DAY,
"useLegacySql": False
}
}
)
trend_by_hour = bigquery.BigQueryInsertJobOperator(
task_id="Trend_By_Hour",
configuration={
"query": {
"query": TREND_BY_HOUR,
"useLegacySql": False
}
}
)
start >> curate_chicago_crimes >> [trend_by_year, trend_by_month, trend_by_day, trend_by_hour] >> end