datasets/austin_crime/pipelines/crime/crime_dag.py (160 lines of code) (raw):
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.transfers import gcs_to_bigquery
default_args = {
"owner": "Google",
"depends_on_past": False,
"start_date": "2021-03-01",
}
with DAG(
dag_id="austin_crime.crime",
default_args=default_args,
max_active_runs=1,
schedule_interval="@daily",
catchup=False,
default_view="graph",
) as dag:
# Run CSV transform within kubernetes pod
austin_crime_transform_csv = kubernetes_pod.KubernetesPodOperator(
task_id="austin_crime_transform_csv",
name="crime",
namespace="composer",
service_account_name="datasets",
image_pull_policy="Always",
image="{{ var.json.austin_crime.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": '["gs://pdp-feeds-staging/Austin_Crime/Annual_Crime_2014.csv","gs://pdp-feeds-staging/Austin_Crime/Annual_Crime_Dataset_2015.csv","gs://pdp-feeds-staging/Austin_Crime/2016_Annual_Crime_Data.csv"]',
"SOURCE_FILE": '["files/data1.csv","files/data2.csv","files/data3.csv"]',
"TARGET_FILE": "files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/austin_crime/crime/data_output.csv",
"FILE_PATH": "files/",
"CSV_HEADERS": '["unique_key","address","census_tract","clearance_date","clearance_status","council_district_code","description","district","latitude","longitude","location","location_description","primary_type","timestamp","x_coordinate","y_coordinate","year","zipcode"]',
"RENAME_MAPPINGS": '{"GO Primary Key" : "unique_key","Council District" : "council_district_code","GO Highest Offense Desc" : "description","Highest NIBRS/UCR Offense Description" : "primary_type","GO Report Date" : "timestamp","GO Location" : "location_description","Clearance Status" : "clearance_status","Clearance Date" : "clearance_date","GO District" : "district","GO Location Zip" : "zipcode","GO Census Tract" : "census_tract","GO X Coordinate" : "x_coordinate","GO Y Coordinate" : "y_coordinate","Location_1" : "temp_address"}',
},
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "10G",
},
)
# Task to load CSV data to a BigQuery table
load_austin_crime_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_austin_crime_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/austin_crime/crime/data_output.csv"],
source_format="CSV",
destination_project_dataset_table="austin_crime.crime",
skip_leading_rows=1,
allow_quoted_newlines=True,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"name": "unique_key",
"type": "integer",
"description": "Unique identifier for the record.",
"mode": "nullable",
},
{
"name": "address",
"type": "string",
"description": "Full address where the incident occurred.",
"mode": "nullable",
},
{
"name": "census_tract",
"type": "float",
"description": "",
"mode": "nullable",
},
{
"name": "clearance_date",
"type": "timestamp",
"description": "",
"mode": "nullable",
},
{
"name": "clearance_status",
"type": "string",
"description": "",
"mode": "nullable",
},
{
"name": "council_district_code",
"type": "integer",
"description": "Indicates the council district code where the incident occurred.",
"mode": "nullable",
},
{
"name": "description",
"type": "string",
"description": "The subcategory of the primary description.",
"mode": "nullable",
},
{
"name": "district",
"type": "string",
"description": "Indicates the police district where the incident occurred.",
"mode": "nullable",
},
{
"name": "latitude",
"type": "float",
"description": "",
"mode": "nullable",
},
{
"name": "longitude",
"type": "float",
"description": "",
"mode": "nullable",
},
{
"name": "location",
"type": "string",
"description": "",
"mode": "nullable",
},
{
"name": "location_description",
"type": "string",
"description": "Description of the location where the incident occurred.",
"mode": "nullable",
},
{
"name": "primary_type",
"type": "string",
"description": "The primary description of the NIBRS/UCR code.",
"mode": "nullable",
},
{
"name": "timestamp",
"type": "timestamp",
"description": "Time when the incident occurred. This is sometimes a best estimate.",
"mode": "nullable",
},
{
"name": "x_coordinate",
"type": "integer",
"description": "The x coordinate of the location where the incident occurred",
"mode": "nullable",
},
{
"name": "y_coordinate",
"type": "integer",
"description": "The y coordinate of the location where the incident occurred",
"mode": "nullable",
},
{
"name": "year",
"type": "integer",
"description": "Indicates the year in which the incident occurred.",
"mode": "nullable",
},
{
"name": "zipcode",
"type": "string",
"description": "Indicates the zipcode where the incident occurred.",
"mode": "nullable",
},
],
)
austin_crime_transform_csv >> load_austin_crime_to_bq