datasets/cfpb_complaints/pipelines/complaint_database/complaint_database_dag.py (161 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.transfers import gcs_to_bigquery
default_args = {
"owner": "Google",
"depends_on_past": False,
"start_date": "2021-03-01",
}
with DAG(
dag_id="cfpb_complaints.complaint_database",
default_args=default_args,
max_active_runs=1,
schedule_interval="@daily",
catchup=False,
default_view="graph",
) as dag:
# Run CSV transform within kubernetes pod
complaint_database_transform_csv = kubernetes_pod.KubernetesPodOperator(
task_id="complaint_database_transform_csv",
startup_timeout_seconds=600,
name="complaint_database",
namespace="composer",
service_account_name="datasets",
image_pull_policy="Always",
image="{{ var.json.cfpb_complaints.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": "http://files.consumerfinance.gov/ccdb/complaints.csv.zip",
"SOURCE_FILE": "files/data.zip",
"TARGET_FILE": "files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/cfpb_complaints/complaint_database/data_output.csv",
"PIPELINE_NAME": "complaint_database",
"CSV_HEADERS": '["date_received","product","subproduct","issue","subissue","consumer_complaint_narrative","company_public_response","company_name","state","zip_code","tags","consumer_consent_provided","submitted_via","date_sent_to_company","company_response_to_consumer","timely_response","consumer_disputed","complaint_id"]',
"RENAME_MAPPINGS": '{"Complaint ID":"complaint_id","Date received":"date_received","Product":"product","Sub-product":"subproduct","Issue":"issue","Sub-issue":"subissue","Consumer complaint narrative":"consumer_complaint_narrative","Company response to consumer":"company_response_to_consumer","Company public response":"company_public_response","Company":"company_name","State":"state","ZIP code":"zip_code","Tags":"tags","Consumer consent provided?":"consumer_consent_provided","Submitted via":"submitted_via","Date sent to company":"date_sent_to_company","Timely response?":"timely_response","Consumer disputed?":"consumer_disputed"}',
},
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "8G",
},
)
# Task to load CSV data to a BigQuery table
load_complaint_database_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_complaint_database_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/cfpb_complaints/complaint_database/data_output.csv"],
source_format="CSV",
destination_project_dataset_table="cfpb_complaints.complaint_database",
skip_leading_rows=1,
allow_quoted_newlines=True,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"name": "date_received",
"type": "date",
"description": "Date the complaint was received by the CPFB",
"mode": "nullable",
},
{
"name": "product",
"type": "string",
"description": "The type of product the consumer identified in the complaint",
"mode": "nullable",
},
{
"name": "subproduct",
"type": "string",
"description": "The type of sub-product the consumer identified in the complaint",
"mode": "nullable",
},
{
"name": "issue",
"type": "string",
"description": "The issue the consumer identified in the complaint",
"mode": "nullable",
},
{
"name": "subissue",
"type": "string",
"description": "The sub-issue the consumer identified in the complaint",
"mode": "nullable",
},
{
"name": "consumer_complaint_narrative",
"type": "string",
"description": "A description of the complaint provided by the consumer",
"mode": "nullable",
},
{
"name": "company_public_response",
"type": "string",
"description": "The company's optional public-facing response to a consumer's complaint",
"mode": "nullable",
},
{
"name": "company_name",
"type": "string",
"description": "Name of the company identified in the complaint by the consumer",
"mode": "nullable",
},
{
"name": "state",
"type": "string",
"description": "Two letter postal abbreviation of the state of the mailing address provided by the consumer",
"mode": "nullable",
},
{
"name": "zip_code",
"type": "string",
"description": "The mailing ZIP code provided by the consumer",
"mode": "nullable",
},
{
"name": "tags",
"type": "string",
"description": "Data that supports easier searching and sorting of complaints",
"mode": "nullable",
},
{
"name": "consumer_consent_provided",
"type": "string",
"description": "Identifies whether the consumer opted in to publish their complaint narrative",
"mode": "nullable",
},
{
"name": "submitted_via",
"type": "string",
"description": "How the complaint was submitted to the CFPB",
"mode": "nullable",
},
{
"name": "date_sent_to_company",
"type": "date",
"description": "The date the CFPB sent the complaint to the company",
"mode": "nullable",
},
{
"name": "company_response_to_consumer",
"type": "string",
"description": "The response from the company about this complaint",
"mode": "nullable",
},
{
"name": "timely_response",
"type": "boolean",
"description": "Indicates whether the company gave a timely response or not",
"mode": "nullable",
},
{
"name": "consumer_disputed",
"type": "boolean",
"description": "Whether the consumer disputed the company's response",
"mode": "nullable",
},
{
"name": "complaint_id",
"type": "string",
"description": "Unique ID for complaints registered with the CFPB",
"mode": "nullable",
},
],
)
complaint_database_transform_csv >> load_complaint_database_to_bq