datasets/bls/pipelines/unemployment_cps_series/pipeline.yaml (188 lines of code) (raw):

# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. --- resources: - type: bigquery_table table_id: unemployment_cps_series description: "Unemployment_CPS_Series Dataset" dag: airflow_version: 2 initialize: dag_id: "unemployment_cps_series" default_args: owner: "Google" depends_on_past: False start_date: '2021-03-01' max_active_runs: 1 schedule_interval: "@daily" catchup: False default_view: graph tasks: - operator: "KubernetesPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_csv" startup_timeout_seconds: 600 name: "unemployment_cps_series" namespace: "composer-user-workloads" service_account_name: "default" config_file: "/home/airflow/composer_kube_config" image_pull_policy: "Always" image: "{{ var.json.bls.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URLS: >- ["gs://pdp-feeds-staging/Bureau/ln.series.tsv"] SOURCE_FILES: >- ["files/data1.tsv"] CHUNKSIZE: "50000" TARGET_FILE: "files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/bls/unemployment_cps_series/data_output.csv" PIPELINE_NAME: "unemployment_cps_series" JOINING_KEY: "" TRIM_SPACE: >- ["series_id","footnote_codes","series_title"] CSV_HEADERS: >- ["series_id","lfst_code","periodicity_code","series_title","absn_code","activity_code","ages_code","class_code","duration_code","education_code","entr_code","expr_code","hheader_code","hour_code","indy_code","jdes_code","look_code","mari_code","mjhs_code","occupation_code","orig_code","pcts_code","race_code","rjnw_code","rnlf_code","rwns_code","seek_code","sexs_code","tdat_code","vets_code","wkst_code","born_code","chld_code","disa_code","seasonal","footnote_codes","begin_year","begin_period","end_year","end_period","cert_code"] container_resources: memory: request: "80Gi" cpu: request: "2" ephemeral-storage: request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_to_bq" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/bls/unemployment_cps_series/data_output-*.csv"] source_format: "CSV" destination_project_dataset_table: "bls.unemployment_cps_series" skip_leading_rows: 1 allow_quoted_newlines: True write_disposition: "WRITE_TRUNCATE" schema_fields: - name: "series_id" type: "STRING" mode: "required" - name: "lfst_code" type: "INTEGER" mode: "NULLABLE" - name: "periodicity_code" type: "STRING" mode: "NULLABLE" - name: "series_title" type: "STRING" mode: "NULLABLE" - name: "absn_code" type: "INTEGER" mode: "NULLABLE" - name: "activity_code" type: "INTEGER" mode: "NULLABLE" - name: "ages_code" type: "INTEGER" mode: "NULLABLE" - name: "class_code" type: "INTEGER" mode: "NULLABLE" - name: "duration_code" type: "INTEGER" mode: "NULLABLE" - name: "education_code" type: "INTEGER" mode: "NULLABLE" - name: "entr_code" type: "INTEGER" mode: "NULLABLE" - name: "expr_code" type: "INTEGER" mode: "NULLABLE" - name: "hheader_code" type: "INTEGER" mode: "NULLABLE" - name: "hour_code" type: "INTEGER" mode: "NULLABLE" - name: "indy_code" type: "INTEGER" mode: "NULLABLE" - name: "jdes_code" type: "INTEGER" mode: "NULLABLE" - name: "look_code" type: "INTEGER" mode: "NULLABLE" - name: "mari_code" type: "INTEGER" mode: "NULLABLE" - name: "mjhs_code" type: "INTEGER" mode: "NULLABLE" - name: "occupation_code" type: "INTEGER" mode: "NULLABLE" - name: "orig_code" type: "INTEGER" mode: "NULLABLE" - name: "pcts_code" type: "INTEGER" mode: "NULLABLE" - name: "race_code" type: "INTEGER" mode: "NULLABLE" - name: "rjnw_code" type: "INTEGER" mode: "NULLABLE" - name: "rnlf_code" type: "INTEGER" mode: "NULLABLE" - name: "rwns_code" type: "INTEGER" mode: "NULLABLE" - name: "seek_code" type: "INTEGER" mode: "NULLABLE" - name: "sexs_code" type: "INTEGER" mode: "NULLABLE" - name: "tdat_code" type: "INTEGER" mode: "NULLABLE" - name: "vets_code" type: "INTEGER" mode: "NULLABLE" - name: "wkst_code" type: "INTEGER" mode: "NULLABLE" - name: "born_code" type: "INTEGER" mode: "NULLABLE" - name: "chld_code" type: "INTEGER" mode: "NULLABLE" - name: "disa_code" type: "INTEGER" mode: "NULLABLE" - name: "seasonal" type: "STRING" mode: "NULLABLE" - name: "footnote_codes" type: "STRING" mode: "NULLABLE" - name: "begin_year" type: "INTEGER" mode: "NULLABLE" - name: "begin_period" type: "STRING" mode: "NULLABLE" - name: "end_year" type: "INTEGER" mode: "NULLABLE" - name: "end_period" type: "STRING" mode: "NULLABLE" - name: "cert_code" type: "INTEGER" mode: "NULLABLE" graph_paths: - "transform_csv >> load_to_bq"