datasets/fec/pipelines/individuals_ingest_2020/pipeline.yaml (1,249 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. --- resources: - type: bigquery_table table_id: individuals_ingest_2020 description: "FEC table" dag: airflow_version: 2 initialize: dag_id: individuals_ingest_2020 default_args: owner: "Google" depends_on_past: False start_date: "2022-08-13" max_active_runs: 1 schedule_interval: "@daily" catchup: False default_view: graph tasks: - operator: "GoogleCloudStorageToGoogleCloudStorageOperator" description: "Task to archive the CSV file in the destination bucket" args: task_id: "download_zip_file_to_composer_bucket" source_bucket: "pdp-feeds-staging" source_object: "FEC/unzipped/indiv20" destination_bucket: "{{ var.value.composer_bucket }}" destination_object: "data/fec/individuals/indiv20.txt" move_object: False - operator: "BashOperator" description: "Task to split the text file" args: task_id: "split_file" bash_command: | split -l 23000000 --additional-suffix=.txt "$data_dir/individuals/indiv20.txt" "$data_dir/individuals/indiv20_" env: data_dir: /home/airflow/gcs/data/fec - operator: "GKECreateClusterOperator" args: task_id: "create_cluster" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" body: name: pdp-fec-individuals-ingest-2020 initial_node_count: 1 network: "{{ var.value.vpc_network }}" node_config: machine_type: e2-standard-16 oauth_scopes: - https://www.googleapis.com/auth/devstorage.read_write - https://www.googleapis.com/auth/cloud-platform - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "individuals_ingest_2020_transform_csv_1" startup_timeout_seconds: 600 name: "individuals_ingest_2020" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-fec-individuals-ingest-2020 image_pull_policy: "Always" image: "{{ var.json.fec.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" SOURCE_GCS_OBJECT: "data/fec/individuals/indiv20_aa.txt" SOURCE_FILE: "files/indiv20_aa.txt" TARGET_FILE: "files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/fec/individuals/data_output_ingest_1.csv" CHUNKSIZE: "1000000" PIPELINE_NAME: "individuals_ingest_2020" CSV_HEADERS: >- ["cmte_id","amndt_ind","rpt_tp","transaction_pgi","image_num","transaction_tp","entity_tp","name","city","state", "zip_code","employer","occupation","transaction_dt","transaction_amt","other_id","tran_id","file_num", "memo_cd","memo_text","sub_id"] RENAME_MAPPINGS: >- {"0":"cmte_id","1":"amndt_ind","2":"rpt_tp","3":"transaction_pgi","4":"image_num","5":"transaction_tp", "6":"entity_tp","7":"name","8":"city","9":"state","10":"zip_code","11":"employer", "12":"occupation","13":"transaction_dt","14":"transaction_amt","15":"other_id","16":"tran_id", "17":"file_num","18":"memo_cd","19":"memo_text","20":"sub_id"} container_resources: memory: request: "32Gi" cpu: request: "2" ephemeral-storage: request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_individuals_ingest_2020_to_bq_1" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/fec/individuals/data_output_ingest_1.csv"] source_format: "CSV" destination_project_dataset_table: "fec.individuals_ingest_2020" skip_leading_rows: 1 allow_quoted_newlines: True write_disposition: "WRITE_TRUNCATE" schema_fields: - name: "cmte_id" type: "string" description: "Filer Identification Number" mode: "nullable" - name: "amndt_ind" type: "string" description: "Amendment Indicator" mode: "nullable" - name: "rpt_tp" type: "string" description: "Report Type" mode: "nullable" - name: "transaction_pgi" type: "string" description: "Primary-General Indicator" mode: "nullable" - name: "image_num" type: "integer" description: "Image Number" mode: "nullable" - name: "transaction_tp" type: "string" description: "Transaction Type" mode: "nullable" - name: "entity_tp" type: "string" description: "Entity Type" mode: "nullable" - name: "name" type: "string" description: "Contributor/Lender/Transfer Name" mode: "nullable" - name: "city" type: "string" description: "City/Town" mode: "nullable" - name: "state" type: "string" description: "State" mode: "nullable" - name: "zip_code" type: "string" description: "Zip Code" mode: "nullable" - name: "employer" type: "string" description: "Employer" mode: "nullable" - name: "occupation" type: "string" description: "Occupation" mode: "nullable" - name: "transaction_dt" type: "date" description: "Transaction Date(MMDDYYYY)" mode: "nullable" - name: "transaction_amt" type: "float" description: "Transaction Amount" mode: "nullable" - name: "other_id" type: "string" description: "Other Identification Number" mode: "nullable" - name: "tran_id" type: "string" description: "Transaction ID" mode: "nullable" - name: "file_num" type: "integer" description: "File Number / Report ID" mode: "nullable" - name: "memo_cd" type: "string" description: "Memo Code" mode: "nullable" - name: "memo_text" type: "string" description: "Memo Text" mode: "nullable" - name: "sub_id" type: "integer" description: "FEC Record Number" mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "individuals_ingest_2020_transform_csv_2" startup_timeout_seconds: 600 name: "individuals_ingest_2020" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-fec-individuals-ingest-2020 image_pull_policy: "Always" image: "{{ var.json.fec.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" SOURCE_GCS_OBJECT: "data/fec/individuals/indiv20_ab.txt" SOURCE_FILE: "files/indiv20_ab.txt" TARGET_FILE: "files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/fec/individuals/data_output_ingest_2.csv" CHUNKSIZE: "100000" PIPELINE_NAME: "individuals_ingest_2020" CSV_HEADERS: >- ["cmte_id","amndt_ind","rpt_tp","transaction_pgi","image_num","transaction_tp","entity_tp","name","city","state", "zip_code","employer","occupation","transaction_dt","transaction_amt","other_id","tran_id","file_num", "memo_cd","memo_text","sub_id"] RENAME_MAPPINGS: >- {"0":"cmte_id","1":"amndt_ind","2":"rpt_tp","3":"transaction_pgi","4":"image_num","5":"transaction_tp", "6":"entity_tp","7":"name","8":"city","9":"state","10":"zip_code","11":"employer", "12":"occupation","13":"transaction_dt","14":"transaction_amt","15":"other_id","16":"tran_id", "17":"file_num","18":"memo_cd","19":"memo_text","20":"sub_id"} container_resources: memory: request: "32Gi" cpu: request: "2" ephemeral-storage: request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_individuals_ingest_2020_to_bq_2" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/fec/individuals/data_output_ingest_2.csv"] source_format: "CSV" destination_project_dataset_table: "fec.individuals_ingest_2020" skip_leading_rows: 1 allow_quoted_newlines: True write_disposition: "WRITE_APPEND" schema_fields: - name: "cmte_id" type: "string" description: "Filer Identification Number" mode: "nullable" - name: "amndt_ind" type: "string" description: "Amendment Indicator" mode: "nullable" - name: "rpt_tp" type: "string" description: "Report Type" mode: "nullable" - name: "transaction_pgi" type: "string" description: "Primary-General Indicator" mode: "nullable" - name: "image_num" type: "integer" description: "Image Number" mode: "nullable" - name: "transaction_tp" type: "string" description: "Transaction Type" mode: "nullable" - name: "entity_tp" type: "string" description: "Entity Type" mode: "nullable" - name: "name" type: "string" description: "Contributor/Lender/Transfer Name" mode: "nullable" - name: "city" type: "string" description: "City/Town" mode: "nullable" - name: "state" type: "string" description: "State" mode: "nullable" - name: "zip_code" type: "string" description: "Zip Code" mode: "nullable" - name: "employer" type: "string" description: "Employer" mode: "nullable" - name: "occupation" type: "string" description: "Occupation" mode: "nullable" - name: "transaction_dt" type: "date" description: "Transaction Date(MMDDYYYY)" mode: "nullable" - name: "transaction_amt" type: "float" description: "Transaction Amount" mode: "nullable" - name: "other_id" type: "string" description: "Other Identification Number" mode: "nullable" - name: "tran_id" type: "string" description: "Transaction ID" mode: "nullable" - name: "file_num" type: "integer" description: "File Number / Report ID" mode: "nullable" - name: "memo_cd" type: "string" description: "Memo Code" mode: "nullable" - name: "memo_text" type: "string" description: "Memo Text" mode: "nullable" - name: "sub_id" type: "integer" description: "FEC Record Number" mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "individuals_ingest_2020_transform_csv_3" startup_timeout_seconds: 600 name: "individuals_ingest_2020" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-fec-individuals-ingest-2020 image_pull_policy: "Always" image: "{{ var.json.fec.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" SOURCE_GCS_OBJECT: "data/fec/individuals/indiv20_ac.txt" SOURCE_FILE: "files/indiv20_ac.txt" TARGET_FILE: "files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/fec/individuals/data_output_ingest_3.csv" CHUNKSIZE: "100000" PIPELINE_NAME: "individuals_ingest_2020" CSV_HEADERS: >- ["cmte_id","amndt_ind","rpt_tp","transaction_pgi","image_num","transaction_tp","entity_tp","name","city","state", "zip_code","employer","occupation","transaction_dt","transaction_amt","other_id","tran_id","file_num", "memo_cd","memo_text","sub_id"] RENAME_MAPPINGS: >- {"0":"cmte_id","1":"amndt_ind","2":"rpt_tp","3":"transaction_pgi","4":"image_num","5":"transaction_tp", "6":"entity_tp","7":"name","8":"city","9":"state","10":"zip_code","11":"employer", "12":"occupation","13":"transaction_dt","14":"transaction_amt","15":"other_id","16":"tran_id", "17":"file_num","18":"memo_cd","19":"memo_text","20":"sub_id"} container_resources: memory: request: "32Gi" cpu: request: "2" ephemeral-storage: request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_individuals_ingest_2020_to_bq_3" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/fec/individuals/data_output_ingest_3.csv"] source_format: "CSV" destination_project_dataset_table: "fec.individuals_ingest_2020" skip_leading_rows: 1 allow_quoted_newlines: True write_disposition: "WRITE_APPEND" schema_fields: - name: "cmte_id" type: "string" description: "Filer Identification Number" mode: "nullable" - name: "amndt_ind" type: "string" description: "Amendment Indicator" mode: "nullable" - name: "rpt_tp" type: "string" description: "Report Type" mode: "nullable" - name: "transaction_pgi" type: "string" description: "Primary-General Indicator" mode: "nullable" - name: "image_num" type: "integer" description: "Image Number" mode: "nullable" - name: "transaction_tp" type: "string" description: "Transaction Type" mode: "nullable" - name: "entity_tp" type: "string" description: "Entity Type" mode: "nullable" - name: "name" type: "string" description: "Contributor/Lender/Transfer Name" mode: "nullable" - name: "city" type: "string" description: "City/Town" mode: "nullable" - name: "state" type: "string" description: "State" mode: "nullable" - name: "zip_code" type: "string" description: "Zip Code" mode: "nullable" - name: "employer" type: "string" description: "Employer" mode: "nullable" - name: "occupation" type: "string" description: "Occupation" mode: "nullable" - name: "transaction_dt" type: "date" description: "Transaction Date(MMDDYYYY)" mode: "nullable" - name: "transaction_amt" type: "float" description: "Transaction Amount" mode: "nullable" - name: "other_id" type: "string" description: "Other Identification Number" mode: "nullable" - name: "tran_id" type: "string" description: "Transaction ID" mode: "nullable" - name: "file_num" type: "integer" description: "File Number / Report ID" mode: "nullable" - name: "memo_cd" type: "string" description: "Memo Code" mode: "nullable" - name: "memo_text" type: "string" description: "Memo Text" mode: "nullable" - name: "sub_id" type: "integer" description: "FEC Record Number" mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "individuals_ingest_2020_transform_csv_4" startup_timeout_seconds: 600 name: "individuals_ingest_2020" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-fec-individuals-ingest-2020 image_pull_policy: "Always" image: "{{ var.json.fec.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" SOURCE_GCS_OBJECT: "data/fec/individuals/indiv20_ad.txt" SOURCE_FILE: "files/indiv20_ad.txt" TARGET_FILE: "files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/fec/individuals/data_output_ingest_4.csv" CHUNKSIZE: "100000" PIPELINE_NAME: "individuals_ingest_2020" CSV_HEADERS: >- ["cmte_id","amndt_ind","rpt_tp","transaction_pgi","image_num","transaction_tp","entity_tp","name","city","state", "zip_code","employer","occupation","transaction_dt","transaction_amt","other_id","tran_id","file_num", "memo_cd","memo_text","sub_id"] RENAME_MAPPINGS: >- {"0":"cmte_id","1":"amndt_ind","2":"rpt_tp","3":"transaction_pgi","4":"image_num","5":"transaction_tp", "6":"entity_tp","7":"name","8":"city","9":"state","10":"zip_code","11":"employer", "12":"occupation","13":"transaction_dt","14":"transaction_amt","15":"other_id","16":"tran_id", "17":"file_num","18":"memo_cd","19":"memo_text","20":"sub_id"} container_resources: memory: request: "32Gi" cpu: request: "2" ephemeral-storage: request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_individuals_ingest_2020_to_bq_4" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/fec/individuals/data_output_ingest_4.csv"] source_format: "CSV" destination_project_dataset_table: "fec.individuals_ingest_2020" skip_leading_rows: 1 allow_quoted_newlines: True write_disposition: "WRITE_APPEND" schema_fields: - name: "cmte_id" type: "string" description: "Filer Identification Number" mode: "nullable" - name: "amndt_ind" type: "string" description: "Amendment Indicator" mode: "nullable" - name: "rpt_tp" type: "string" description: "Report Type" mode: "nullable" - name: "transaction_pgi" type: "string" description: "Primary-General Indicator" mode: "nullable" - name: "image_num" type: "integer" description: "Image Number" mode: "nullable" - name: "transaction_tp" type: "string" description: "Transaction Type" mode: "nullable" - name: "entity_tp" type: "string" description: "Entity Type" mode: "nullable" - name: "name" type: "string" description: "Contributor/Lender/Transfer Name" mode: "nullable" - name: "city" type: "string" description: "City/Town" mode: "nullable" - name: "state" type: "string" description: "State" mode: "nullable" - name: "zip_code" type: "string" description: "Zip Code" mode: "nullable" - name: "employer" type: "string" description: "Employer" mode: "nullable" - name: "occupation" type: "string" description: "Occupation" mode: "nullable" - name: "transaction_dt" type: "date" description: "Transaction Date(MMDDYYYY)" mode: "nullable" - name: "transaction_amt" type: "float" description: "Transaction Amount" mode: "nullable" - name: "other_id" type: "string" description: "Other Identification Number" mode: "nullable" - name: "tran_id" type: "string" description: "Transaction ID" mode: "nullable" - name: "file_num" type: "integer" description: "File Number / Report ID" mode: "nullable" - name: "memo_cd" type: "string" description: "Memo Code" mode: "nullable" - name: "memo_text" type: "string" description: "Memo Text" mode: "nullable" - name: "sub_id" type: "integer" description: "FEC Record Number" mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "individuals_ingest_2020_transform_csv_5" startup_timeout_seconds: 600 name: "individuals_ingest_2020" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-fec-individuals-ingest-2020 image_pull_policy: "Always" image: "{{ var.json.fec.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" SOURCE_GCS_OBJECT: "data/fec/individuals/indiv20_ae.txt" SOURCE_FILE: "files/indiv20_ae.txt" TARGET_FILE: "files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/fec/individuals/data_output_ingest_5.csv" CHUNKSIZE: "100000" PIPELINE_NAME: "individuals_ingest_2020" CSV_HEADERS: >- ["cmte_id","amndt_ind","rpt_tp","transaction_pgi","image_num","transaction_tp","entity_tp","name","city","state", "zip_code","employer","occupation","transaction_dt","transaction_amt","other_id","tran_id","file_num", "memo_cd","memo_text","sub_id"] RENAME_MAPPINGS: >- {"0":"cmte_id","1":"amndt_ind","2":"rpt_tp","3":"transaction_pgi","4":"image_num","5":"transaction_tp", "6":"entity_tp","7":"name","8":"city","9":"state","10":"zip_code","11":"employer", "12":"occupation","13":"transaction_dt","14":"transaction_amt","15":"other_id","16":"tran_id", "17":"file_num","18":"memo_cd","19":"memo_text","20":"sub_id"} container_resources: memory: request: "32Gi" cpu: request: "2" ephemeral-storage: request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_individuals_ingest_2020_to_bq_5" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/fec/individuals/data_output_ingest_5.csv"] source_format: "CSV" destination_project_dataset_table: "fec.individuals_ingest_2020" skip_leading_rows: 1 allow_quoted_newlines: True write_disposition: "WRITE_APPEND" schema_fields: - name: "cmte_id" type: "string" description: "Filer Identification Number" mode: "nullable" - name: "amndt_ind" type: "string" description: "Amendment Indicator" mode: "nullable" - name: "rpt_tp" type: "string" description: "Report Type" mode: "nullable" - name: "transaction_pgi" type: "string" description: "Primary-General Indicator" mode: "nullable" - name: "image_num" type: "integer" description: "Image Number" mode: "nullable" - name: "transaction_tp" type: "string" description: "Transaction Type" mode: "nullable" - name: "entity_tp" type: "string" description: "Entity Type" mode: "nullable" - name: "name" type: "string" description: "Contributor/Lender/Transfer Name" mode: "nullable" - name: "city" type: "string" description: "City/Town" mode: "nullable" - name: "state" type: "string" description: "State" mode: "nullable" - name: "zip_code" type: "string" description: "Zip Code" mode: "nullable" - name: "employer" type: "string" description: "Employer" mode: "nullable" - name: "occupation" type: "string" description: "Occupation" mode: "nullable" - name: "transaction_dt" type: "date" description: "Transaction Date(MMDDYYYY)" mode: "nullable" - name: "transaction_amt" type: "float" description: "Transaction Amount" mode: "nullable" - name: "other_id" type: "string" description: "Other Identification Number" mode: "nullable" - name: "tran_id" type: "string" description: "Transaction ID" mode: "nullable" - name: "file_num" type: "integer" description: "File Number / Report ID" mode: "nullable" - name: "memo_cd" type: "string" description: "Memo Code" mode: "nullable" - name: "memo_text" type: "string" description: "Memo Text" mode: "nullable" - name: "sub_id" type: "integer" description: "FEC Record Number" mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "individuals_ingest_2020_transform_csv_6" startup_timeout_seconds: 600 name: "individuals_ingest_2020" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-fec-individuals-ingest-2020 image_pull_policy: "Always" image: "{{ var.json.fec.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" SOURCE_GCS_OBJECT: "data/fec/individuals/indiv20_af.txt" SOURCE_FILE: "files/indiv20_af.txt" TARGET_FILE: "files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/fec/individuals/data_output_ingest_6.csv" CHUNKSIZE: "100000" PIPELINE_NAME: "individuals_ingest_2020" CSV_HEADERS: >- ["cmte_id","amndt_ind","rpt_tp","transaction_pgi","image_num","transaction_tp","entity_tp","name","city","state", "zip_code","employer","occupation","transaction_dt","transaction_amt","other_id","tran_id","file_num", "memo_cd","memo_text","sub_id"] RENAME_MAPPINGS: >- {"0":"cmte_id","1":"amndt_ind","2":"rpt_tp","3":"transaction_pgi","4":"image_num","5":"transaction_tp", "6":"entity_tp","7":"name","8":"city","9":"state","10":"zip_code","11":"employer", "12":"occupation","13":"transaction_dt","14":"transaction_amt","15":"other_id","16":"tran_id", "17":"file_num","18":"memo_cd","19":"memo_text","20":"sub_id"} container_resources: memory: request: "32Gi" cpu: request: "2" ephemeral-storage: request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_individuals_ingest_2020_to_bq_6" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/fec/individuals/data_output_ingest_6.csv"] source_format: "CSV" destination_project_dataset_table: "fec.individuals_ingest_2020" skip_leading_rows: 1 allow_quoted_newlines: True write_disposition: "WRITE_APPEND" schema_fields: - name: "cmte_id" type: "string" description: "Filer Identification Number" mode: "nullable" - name: "amndt_ind" type: "string" description: "Amendment Indicator" mode: "nullable" - name: "rpt_tp" type: "string" description: "Report Type" mode: "nullable" - name: "transaction_pgi" type: "string" description: "Primary-General Indicator" mode: "nullable" - name: "image_num" type: "integer" description: "Image Number" mode: "nullable" - name: "transaction_tp" type: "string" description: "Transaction Type" mode: "nullable" - name: "entity_tp" type: "string" description: "Entity Type" mode: "nullable" - name: "name" type: "string" description: "Contributor/Lender/Transfer Name" mode: "nullable" - name: "city" type: "string" description: "City/Town" mode: "nullable" - name: "state" type: "string" description: "State" mode: "nullable" - name: "zip_code" type: "string" description: "Zip Code" mode: "nullable" - name: "employer" type: "string" description: "Employer" mode: "nullable" - name: "occupation" type: "string" description: "Occupation" mode: "nullable" - name: "transaction_dt" type: "date" description: "Transaction Date(MMDDYYYY)" mode: "nullable" - name: "transaction_amt" type: "float" description: "Transaction Amount" mode: "nullable" - name: "other_id" type: "string" description: "Other Identification Number" mode: "nullable" - name: "tran_id" type: "string" description: "Transaction ID" mode: "nullable" - name: "file_num" type: "integer" description: "File Number / Report ID" mode: "nullable" - name: "memo_cd" type: "string" description: "Memo Code" mode: "nullable" - name: "memo_text" type: "string" description: "Memo Text" mode: "nullable" - name: "sub_id" type: "integer" description: "FEC Record Number" mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "individuals_ingest_2020_transform_csv_7" startup_timeout_seconds: 600 name: "individuals_ingest_2020" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-fec-individuals-ingest-2020 image_pull_policy: "Always" image: "{{ var.json.fec.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" SOURCE_GCS_OBJECT: "data/fec/individuals/indiv20_ag.txt" SOURCE_FILE: "files/indiv20_ag.txt" TARGET_FILE: "files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/fec/individuals/data_output_ingest_7.csv" CHUNKSIZE: "100000" PIPELINE_NAME: "individuals_ingest_2020" CSV_HEADERS: >- ["cmte_id","amndt_ind","rpt_tp","transaction_pgi","image_num","transaction_tp","entity_tp","name","city","state", "zip_code","employer","occupation","transaction_dt","transaction_amt","other_id","tran_id","file_num", "memo_cd","memo_text","sub_id"] RENAME_MAPPINGS: >- {"0":"cmte_id","1":"amndt_ind","2":"rpt_tp","3":"transaction_pgi","4":"image_num","5":"transaction_tp", "6":"entity_tp","7":"name","8":"city","9":"state","10":"zip_code","11":"employer", "12":"occupation","13":"transaction_dt","14":"transaction_amt","15":"other_id","16":"tran_id", "17":"file_num","18":"memo_cd","19":"memo_text","20":"sub_id"} container_resources: memory: request: "32Gi" cpu: request: "2" ephemeral-storage: request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_individuals_ingest_2020_to_bq_7" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/fec/individuals/data_output_ingest_7.csv"] source_format: "CSV" destination_project_dataset_table: "fec.individuals_ingest_2020" skip_leading_rows: 1 allow_quoted_newlines: True write_disposition: "WRITE_APPEND" schema_fields: - name: "cmte_id" type: "string" description: "Filer Identification Number" mode: "nullable" - name: "amndt_ind" type: "string" description: "Amendment Indicator" mode: "nullable" - name: "rpt_tp" type: "string" description: "Report Type" mode: "nullable" - name: "transaction_pgi" type: "string" description: "Primary-General Indicator" mode: "nullable" - name: "image_num" type: "integer" description: "Image Number" mode: "nullable" - name: "transaction_tp" type: "string" description: "Transaction Type" mode: "nullable" - name: "entity_tp" type: "string" description: "Entity Type" mode: "nullable" - name: "name" type: "string" description: "Contributor/Lender/Transfer Name" mode: "nullable" - name: "city" type: "string" description: "City/Town" mode: "nullable" - name: "state" type: "string" description: "State" mode: "nullable" - name: "zip_code" type: "string" description: "Zip Code" mode: "nullable" - name: "employer" type: "string" description: "Employer" mode: "nullable" - name: "occupation" type: "string" description: "Occupation" mode: "nullable" - name: "transaction_dt" type: "date" description: "Transaction Date(MMDDYYYY)" mode: "nullable" - name: "transaction_amt" type: "float" description: "Transaction Amount" mode: "nullable" - name: "other_id" type: "string" description: "Other Identification Number" mode: "nullable" - name: "tran_id" type: "string" description: "Transaction ID" mode: "nullable" - name: "file_num" type: "integer" description: "File Number / Report ID" mode: "nullable" - name: "memo_cd" type: "string" description: "Memo Code" mode: "nullable" - name: "memo_text" type: "string" description: "Memo Text" mode: "nullable" - name: "sub_id" type: "integer" description: "FEC Record Number" mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "individuals_ingest_2020_transform_csv_8" startup_timeout_seconds: 600 name: "individuals_ingest_2020" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-fec-individuals-ingest-2020 image_pull_policy: "Always" image: "{{ var.json.fec.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" SOURCE_GCS_OBJECT: "data/fec/individuals/indiv20_ah.txt" SOURCE_FILE: "files/indiv20_ah.txt" TARGET_FILE: "files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/fec/individuals/data_output_ingest_8.csv" CHUNKSIZE: "100000" PIPELINE_NAME: "individuals_ingest_2020" CSV_HEADERS: >- ["cmte_id","amndt_ind","rpt_tp","transaction_pgi","image_num","transaction_tp","entity_tp","name","city","state", "zip_code","employer","occupation","transaction_dt","transaction_amt","other_id","tran_id","file_num", "memo_cd","memo_text","sub_id"] RENAME_MAPPINGS: >- {"0":"cmte_id","1":"amndt_ind","2":"rpt_tp","3":"transaction_pgi","4":"image_num","5":"transaction_tp", "6":"entity_tp","7":"name","8":"city","9":"state","10":"zip_code","11":"employer", "12":"occupation","13":"transaction_dt","14":"transaction_amt","15":"other_id","16":"tran_id", "17":"file_num","18":"memo_cd","19":"memo_text","20":"sub_id"} container_resources: memory: request: "32Gi" cpu: request: "2" ephemeral-storage: request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_individuals_ingest_2020_to_bq_8" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/fec/individuals/data_output_ingest_8.csv"] source_format: "CSV" destination_project_dataset_table: "fec.individuals_ingest_2020" skip_leading_rows: 1 allow_quoted_newlines: True write_disposition: "WRITE_APPEND" schema_fields: - name: "cmte_id" type: "string" description: "Filer Identification Number" mode: "nullable" - name: "amndt_ind" type: "string" description: "Amendment Indicator" mode: "nullable" - name: "rpt_tp" type: "string" description: "Report Type" mode: "nullable" - name: "transaction_pgi" type: "string" description: "Primary-General Indicator" mode: "nullable" - name: "image_num" type: "integer" description: "Image Number" mode: "nullable" - name: "transaction_tp" type: "string" description: "Transaction Type" mode: "nullable" - name: "entity_tp" type: "string" description: "Entity Type" mode: "nullable" - name: "name" type: "string" description: "Contributor/Lender/Transfer Name" mode: "nullable" - name: "city" type: "string" description: "City/Town" mode: "nullable" - name: "state" type: "string" description: "State" mode: "nullable" - name: "zip_code" type: "string" description: "Zip Code" mode: "nullable" - name: "employer" type: "string" description: "Employer" mode: "nullable" - name: "occupation" type: "string" description: "Occupation" mode: "nullable" - name: "transaction_dt" type: "date" description: "Transaction Date(MMDDYYYY)" mode: "nullable" - name: "transaction_amt" type: "float" description: "Transaction Amount" mode: "nullable" - name: "other_id" type: "string" description: "Other Identification Number" mode: "nullable" - name: "tran_id" type: "string" description: "Transaction ID" mode: "nullable" - name: "file_num" type: "integer" description: "File Number / Report ID" mode: "nullable" - name: "memo_cd" type: "string" description: "Memo Code" mode: "nullable" - name: "memo_text" type: "string" description: "Memo Text" mode: "nullable" - name: "sub_id" type: "integer" description: "FEC Record Number" mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "individuals_ingest_2020_transform_csv_9" startup_timeout_seconds: 600 name: "individuals_ingest_2020" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-fec-individuals-ingest-2020 image_pull_policy: "Always" image: "{{ var.json.fec.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" SOURCE_GCS_OBJECT: "data/fec/individuals/indiv20_ai.txt" SOURCE_FILE: "files/indiv20_ai.txt" TARGET_FILE: "files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/fec/individuals/data_output_ingest_9.csv" CHUNKSIZE: "100000" PIPELINE_NAME: "individuals_ingest_2020" CSV_HEADERS: >- ["cmte_id","amndt_ind","rpt_tp","transaction_pgi","image_num","transaction_tp","entity_tp","name","city","state", "zip_code","employer","occupation","transaction_dt","transaction_amt","other_id","tran_id","file_num", "memo_cd","memo_text","sub_id"] container_resources: memory: request: "32Gi" cpu: request: "2" ephemeral-storage: request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_individuals_ingest_2020_to_bq_9" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/fec/individuals/data_output_ingest_9.csv"] source_format: "CSV" destination_project_dataset_table: "fec.individuals_ingest_2020" skip_leading_rows: 1 allow_quoted_newlines: True write_disposition: "WRITE_APPEND" schema_fields: - name: "cmte_id" type: "string" description: "Filer Identification Number" mode: "nullable" - name: "amndt_ind" type: "string" description: "Amendment Indicator" mode: "nullable" - name: "rpt_tp" type: "string" description: "Report Type" mode: "nullable" - name: "transaction_pgi" type: "string" description: "Primary-General Indicator" mode: "nullable" - name: "image_num" type: "integer" description: "Image Number" mode: "nullable" - name: "transaction_tp" type: "string" description: "Transaction Type" mode: "nullable" - name: "entity_tp" type: "string" description: "Entity Type" mode: "nullable" - name: "name" type: "string" description: "Contributor/Lender/Transfer Name" mode: "nullable" - name: "city" type: "string" description: "City/Town" mode: "nullable" - name: "state" type: "string" description: "State" mode: "nullable" - name: "zip_code" type: "string" description: "Zip Code" mode: "nullable" - name: "employer" type: "string" description: "Employer" mode: "nullable" - name: "occupation" type: "string" description: "Occupation" mode: "nullable" - name: "transaction_dt" type: "date" description: "Transaction Date(MMDDYYYY)" mode: "nullable" - name: "transaction_amt" type: "float" description: "Transaction Amount" mode: "nullable" - name: "other_id" type: "string" description: "Other Identification Number" mode: "nullable" - name: "tran_id" type: "string" description: "Transaction ID" mode: "nullable" - name: "file_num" type: "integer" description: "File Number / Report ID" mode: "nullable" - name: "memo_cd" type: "string" description: "Memo Code" mode: "nullable" - name: "memo_text" type: "string" description: "Memo Text" mode: "nullable" - name: "sub_id" type: "integer" description: "FEC Record Number" mode: "nullable" - operator: "GKEDeleteClusterOperator" args: task_id: "delete_cluster" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" name: pdp-fec-individuals-ingest-2020 graph_paths: - "download_zip_file_to_composer_bucket >> create_cluster >> split_file >> [individuals_ingest_2020_transform_csv_1, individuals_ingest_2020_transform_csv_2, individuals_ingest_2020_transform_csv_3, individuals_ingest_2020_transform_csv_4, individuals_ingest_2020_transform_csv_5, individuals_ingest_2020_transform_csv_6, individuals_ingest_2020_transform_csv_7, individuals_ingest_2020_transform_csv_8, individuals_ingest_2020_transform_csv_9] >> delete_cluster >> [load_individuals_ingest_2020_to_bq_1, load_individuals_ingest_2020_to_bq_2, load_individuals_ingest_2020_to_bq_3, load_individuals_ingest_2020_to_bq_4, load_individuals_ingest_2020_to_bq_5, load_individuals_ingest_2020_to_bq_6, load_individuals_ingest_2020_to_bq_7, load_individuals_ingest_2020_to_bq_8, load_individuals_ingest_2020_to_bq_9]"