datasets/covid19_italy/pipelines/data_by_province/pipeline.yaml (152 lines of code) (raw):
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
---
resources:
- type: bigquery_table
table_id: data_by_province
description: "COVID-19 Italy Data By Province"
- type: bigquery_table
dataset_id: covid19_italy_eu
table_id: data_by_province
description: "COVID-19 Italy Data By Province"
dag:
airflow_version: 2
initialize:
dag_id: data_by_province
default_args:
owner: "Google"
depends_on_past: False
start_date: '2022-10-03'
max_active_runs: 1
schedule_interval: "@daily"
catchup: False
default_view: graph
tasks:
- operator: "KubernetesPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "data_by_province_transform_csv"
startup_timeout_seconds: 600
name: "covid19_italy_data_by_province"
namespace: "composer-user-workloads"
service_account_name: "default"
config_file: "/home/airflow/composer_kube_config"
image_pull_policy: "Always"
image: "{{ var.json.covid19_italy.container_registry.run_csv_transform_kub }}"
env_vars:
SOURCE_URL: "https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-province/dpc-covid19-ita-province.csv"
SOURCE_FILE: "files/data.csv"
TARGET_FILE: "files/data_output.csv"
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "data/covid19_italy/data_by_province/data_output.csv"
CSV_HEADERS: >-
["date","country","region_code","region_name","province_code","province_name","province_abbreviation","latitude","longitude","location_geom","confirmed_cases","note"]
RENAME_MAPPINGS: >-
{"data": "date","stato": "country","codice_regione": "region_code","denominazione_regione": "region_name","lat": "latitude","long": "longitude","codice_provincia": "province_code","denominazione_provincia": "province_name","sigla_provincia": "province_abbreviation","totale_casi": "confirmed_cases","note": "note"}
PIPELINE_NAME: "data_by_province"
container_resources:
memory:
request: "80Gi"
cpu:
request: "2"
ephemeral-storage:
request: "10Gi"
- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
args:
task_id: "load_data_by_province_to_bq"
bucket: "{{ var.value.composer_bucket }}"
source_objects: ["data/covid19_italy/data_by_province/data_output.csv"]
source_format: "CSV"
destination_project_dataset_table: "covid19_italy.data_by_province"
skip_leading_rows: 1
write_disposition: "WRITE_TRUNCATE"
schema_fields:
- name: "date"
type: "TIMESTAMP"
mode: "NULLABLE"
- name: "country"
type: "STRING"
mode: "NULLABLE"
- name: "region_code"
type: "STRING"
mode: "NULLABLE"
- name: "name"
type: "STRING"
mode: "NULLABLE"
- name: "province_code"
type: "STRING"
mode: "NULLABLE"
- name: "province_name"
type: "STRING"
mode: "NULLABLE"
- name: "province_abbreviation"
type: "STRING"
mode: "NULLABLE"
- name: "latitude"
type: "FLOAT"
mode: "NULLABLE"
- name: "longitude"
type: "FLOAT"
mode: "NULLABLE"
- name: "location_geom"
type: "GEOGRAPHY"
mode: "NULLABLE"
- name: "confirmed_cases"
type: "INTEGER"
mode: "NULLABLE"
- name: "note"
type: "STRING"
mode: "NULLABLE"
- operator: "BashOperator"
description: "Task to copy bq uploadable data file to bucket in EU"
args:
task_id: "copy_data_file_EU"
bash_command: "gsutil cp gs://{{ var.value.composer_bucket }}/data/covid19_italy/data_by_province/data_output.csv {{ var.json.covid19_italy.destination_gcs_province }}"
- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
args:
task_id: "load_data_by_province_to_bq_eu"
bucket: "{{ var.json.covid19_italy.destination_gcs_bucket }}"
source_objects: "province/data_output.csv"
source_format: "CSV"
destination_project_dataset_table: "covid19_italy_eu.data_by_province"
skip_leading_rows: 1
write_disposition: "WRITE_TRUNCATE"
schema_fields:
- name: "date"
type: "TIMESTAMP"
mode: "NULLABLE"
- name: "country"
type: "STRING"
mode: "NULLABLE"
- name: "region_code"
type: "STRING"
mode: "NULLABLE"
- name: "name"
type: "STRING"
mode: "NULLABLE"
- name: "province_code"
type: "STRING"
mode: "NULLABLE"
- name: "province_name"
type: "STRING"
mode: "NULLABLE"
- name: "province_abbreviation"
type: "STRING"
mode: "NULLABLE"
- name: "latitude"
type: "FLOAT"
mode: "NULLABLE"
- name: "longitude"
type: "FLOAT"
mode: "NULLABLE"
- name: "location_geom"
type: "GEOGRAPHY"
mode: "NULLABLE"
- name: "confirmed_cases"
type: "INTEGER"
mode: "NULLABLE"
- name: "note"
type: "STRING"
mode: "NULLABLE"
graph_paths:
- "data_by_province_transform_csv >> copy_data_file_EU >> [load_data_by_province_to_bq, load_data_by_province_to_bq_eu]"