datasets/covid19_italy/pipelines/data_by_region/pipeline.yaml (194 lines of code) (raw):
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
---
resources:
- type: bigquery_table
table_id: data_by_region
description: "COVID-19 Italy Data By Region"
- type: bigquery_table
dataset_id: covid19_italy_eu
table_id: data_by_region
description: "COVID-19 Italy Data By Region"
dag:
airflow_version: 2
initialize:
dag_id: data_by_region
default_args:
owner: "Google"
depends_on_past: False
start_date: '2022-10-03'
max_active_runs: 1
schedule_interval: "@daily"
catchup: False
default_view: graph
tasks:
- operator: "KubernetesPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "data_by_region_transform_csv"
startup_timeout_seconds: 600
name: "covid19_italy_data_by_region"
namespace: "composer-user-workloads"
service_account_name: "default"
config_file: "/home/airflow/composer_kube_config"
image_pull_policy: "Always"
image: "{{ var.json.covid19_italy.container_registry.run_csv_transform_kub }}"
env_vars:
SOURCE_URL: "https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-regioni/dpc-covid19-ita-regioni.csv"
SOURCE_FILE: "files/data.csv"
TARGET_FILE: "files/data_output.csv"
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "data/covid19_italy/data_by_region/data_output.csv"
CSV_HEADERS: >-
["date","country","region_code","region_name","latitude","longitude","location_geom","hospitalized_patients_symptoms","hospitalized_patients_intensive_care","total_hospitalized_patients","home_confinement_cases","total_current_confirmed_cases","new_current_confirmed_cases","new_total_confirmed_cases","recovered","deaths","total_confirmed_cases","tests_performed","note"]
RENAME_MAPPINGS: >-
{"data": "date","stato": "country","codice_regione": "region_code","denominazione_regione": "region_name","lat": "latitude","long": "longitude","ricoverati_con_sintomi": "hospitalized_patients_symptoms","terapia_intensiva": "hospitalized_patients_intensive_care","totale_ospedalizzati": "total_hospitalized_patients","isolamento_domiciliare": "home_confinement_cases","totale_positivi": "total_current_confirmed_cases","variazione_totale_positivi": "new_current_confirmed_cases","nuovi_positivi": "new_total_confirmed_cases","note": "note","dimessi_guariti": "recovered","totale_casi": "total_confirmed_cases","tamponi": "tests_performed","deceduti": "deaths"}
PIPELINE_NAME: "data_by_region"
container_resources:
memory:
request: "32Gi"
cpu:
request: "2"
ephemeral-storage:
request: "10Gi"
- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
args:
task_id: "load_data_by_region_to_bq"
bucket: "{{ var.value.composer_bucket }}"
source_objects: ["data/covid19_italy/data_by_region/data_output.csv"]
source_format: "CSV"
destination_project_dataset_table: "covid19_italy.data_by_region"
skip_leading_rows: 1
write_disposition: "WRITE_TRUNCATE"
schema_fields:
- name: "date"
type: "TIMESTAMP"
mode: "NULLABLE"
- name: "country"
type: "STRING"
mode: "NULLABLE"
- name: "region_code"
type: "STRING"
mode: "NULLABLE"
- name: "region_name"
type: "STRING"
mode: "NULLABLE"
- name: "latitude"
type: "FLOAT"
mode: "NULLABLE"
- name: "longitude"
type: "FLOAT"
mode: "NULLABLE"
- name: "location_geom"
type: "GEOGRAPHY"
mode: "NULLABLE"
- name: "hospitalized_patients_symptoms"
type: "INTEGER"
mode: "NULLABLE"
- name: "hospitalized_patients_intensive_care"
type: "INTEGER"
mode: "NULLABLE"
- name: "total_hospitalized_patients"
type: "INTEGER"
mode: "NULLABLE"
- name: "home_confinement_cases"
type: "INTEGER"
mode: "NULLABLE"
- name: "total_current_confirmed_cases"
type: "INTEGER"
mode: "NULLABLE"
- name: "new_current_confirmed_cases"
type: "INTEGER"
mode: "NULLABLE"
- name: "new_total_confirmed_cases"
type: "INTEGER"
mode: "NULLABLE"
- name: "recovered"
type: "INTEGER"
mode: "NULLABLE"
- name: "deaths"
type: "INTEGER"
mode: "NULLABLE"
- name: "total_confirmed_cases"
type: "INTEGER"
mode: "NULLABLE"
- name: "tests_performed"
type: "INTEGER"
mode: "NULLABLE"
- name: "note"
type: "STRING"
mode: "NULLABLE"
- operator: "BashOperator"
description: "Task to copy bq uploadable data file to bucket in EU"
args:
task_id: "copy_data_file_EU"
bash_command: "gsutil cp gs://{{ var.value.composer_bucket }}/data/covid19_italy/data_by_region/data_output.csv {{ var.json.covid19_italy.destination_gcs_region }}"
- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
args:
task_id: "load_data_by_region_to_bq_eu"
bucket: "{{ var.json.covid19_italy.destination_gcs_bucket }}"
source_objects: region/data_output.csv
source_format: "CSV"
destination_project_dataset_table: "covid19_italy_eu.data_by_region"
skip_leading_rows: 1
write_disposition: "WRITE_TRUNCATE"
schema_fields:
- name: "date"
type: "TIMESTAMP"
mode: "NULLABLE"
- name: "country"
type: "STRING"
mode: "NULLABLE"
- name: "region_code"
type: "STRING"
mode: "NULLABLE"
- name: "region_name"
type: "STRING"
mode: "NULLABLE"
- name: "latitude"
type: "FLOAT"
mode: "NULLABLE"
- name: "longitude"
type: "FLOAT"
mode: "NULLABLE"
- name: "location_geom"
type: "GEOGRAPHY"
mode: "NULLABLE"
- name: "hospitalized_patients_symptoms"
type: "INTEGER"
mode: "NULLABLE"
- name: "hospitalized_patients_intensive_care"
type: "INTEGER"
mode: "NULLABLE"
- name: "total_hospitalized_patients"
type: "INTEGER"
mode: "NULLABLE"
- name: "home_confinement_cases"
type: "INTEGER"
mode: "NULLABLE"
- name: "total_current_confirmed_cases"
type: "INTEGER"
mode: "NULLABLE"
- name: "new_current_confirmed_cases"
type: "INTEGER"
mode: "NULLABLE"
- name: "new_total_confirmed_cases"
type: "INTEGER"
mode: "NULLABLE"
- name: "recovered"
type: "INTEGER"
mode: "NULLABLE"
- name: "deaths"
type: "INTEGER"
mode: "NULLABLE"
- name: "total_confirmed_cases"
type: "INTEGER"
mode: "NULLABLE"
- name: "tests_performed"
type: "INTEGER"
mode: "NULLABLE"
- name: "note"
type: "STRING"
mode: "NULLABLE"
graph_paths:
- "data_by_region_transform_csv >> copy_data_file_EU >> [load_data_by_region_to_bq, load_data_by_region_to_bq_eu]"