datasets/fec/pipelines/opex_2016/opex_2016_dag.py (200 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from airflow import DAG from airflow.providers.cncf.kubernetes.operators import kubernetes_pod from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { "owner": "Google", "depends_on_past": False, "start_date": "2021-03-01", } with DAG( dag_id="fec.opex_2016", default_args=default_args, max_active_runs=1, schedule_interval="@daily", catchup=False, default_view="graph", ) as dag: # Run CSV transform within kubernetes pod opex_2016_transform_csv = kubernetes_pod.KubernetesPodOperator( task_id="opex_2016_transform_csv", startup_timeout_seconds=600, name="opex_2016", namespace="composer-user-workloads", service_account_name="default", config_file="/home/airflow/composer_kube_config", image_pull_policy="Always", image="{{ var.json.fec.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_URL": "https://cg-519a459a-0ea3-42c2-b7bc-fa1143481f74.s3-us-gov-west-1.amazonaws.com/bulk-downloads/2016/oppexp16.zip", "SOURCE_FILE_ZIP_FILE": "files/zip_file.zip", "SOURCE_FILE_PATH": "files/", "SOURCE_FILE": "files/oppexp.txt", "TARGET_FILE": "files/data_output.csv", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/fec/opex_2016/data_output.csv", "PIPELINE_NAME": "opex_2016", "CSV_HEADERS": '["cmte_id","amndt_ind","rpt_yr","rpt_tp","image_num","line_num","form_tp_cd", "sched_tp_cd","name","city","state","zip_code","transaction_dt","transaction_amt","transaction_pgi", "purpose","category","category_desc","memo_cd","memo_text","entity_tp","sub_id","file_num", "tran_id","back_ref_tran_id"]', }, ) # Task to load CSV data to a BigQuery table load_opex_2016_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_opex_2016_to_bq", bucket="{{ var.value.composer_bucket }}", source_objects=["data/fec/opex_2016/data_output.csv"], source_format="CSV", destination_project_dataset_table="fec.opex_2016", skip_leading_rows=1, allow_quoted_newlines=True, write_disposition="WRITE_TRUNCATE", schema_fields=[ { "name": "cmte_id", "type": "string", "description": "Filer Identification Number", "mode": "nullable", }, { "name": "amndt_ind", "type": "string", "description": "Amendment Indicator", "mode": "nullable", }, { "name": "rpt_yr", "type": "integer", "description": "Report Year", "mode": "nullable", }, { "name": "rpt_tp", "type": "string", "description": "Report Type", "mode": "nullable", }, { "name": "image_num", "type": "integer", "description": "Image Number", "mode": "nullable", }, { "name": "line_num", "type": "string", "description": "Line Number", "mode": "nullable", }, { "name": "form_tp_cd", "type": "string", "description": "Form Type", "mode": "nullable", }, { "name": "sched_tp_cd", "type": "string", "description": "Schedule Type", "mode": "nullable", }, { "name": "name", "type": "string", "description": "Contributor/Lender/Transfer Name", "mode": "nullable", }, { "name": "city", "type": "string", "description": "City/Town", "mode": "nullable", }, { "name": "state", "type": "string", "description": "State", "mode": "nullable", }, { "name": "zip_code", "type": "string", "description": "Zip Code", "mode": "nullable", }, { "name": "transaction_dt", "type": "date", "description": "Transaction Date(MMDDYYYY)", "mode": "nullable", }, { "name": "transaction_amt", "type": "float", "description": "Transaction Amount", "mode": "nullable", }, { "name": "transaction_pgi", "type": "string", "description": "Primary General Indicator", "mode": "nullable", }, { "name": "purpose", "type": "string", "description": "Purpose", "mode": "nullable", }, { "name": "category", "type": "string", "description": "Disbursement Category Code", "mode": "nullable", }, { "name": "category_desc", "type": "string", "description": "Disbursement Category Code Description", "mode": "nullable", }, { "name": "memo_cd", "type": "string", "description": "Memo Code", "mode": "nullable", }, { "name": "memo_text", "type": "string", "description": "Memo Text", "mode": "nullable", }, { "name": "entity_tp", "type": "string", "description": "Entity Type", "mode": "nullable", }, { "name": "sub_id", "type": "integer", "description": "FEC Record Number", "mode": "required", }, { "name": "file_num", "type": "integer", "description": "File Number / Report ID", "mode": "nullable", }, { "name": "tran_id", "type": "string", "description": "Transaction ID", "mode": "nullable", }, { "name": "back_ref_tran_id", "type": "string", "description": "Back Reference Transaction ID", "mode": "nullable", }, ], ) opex_2016_transform_csv >> load_opex_2016_to_bq