datasets/geos_fp/pipelines/copy_files_rolling_basis/pipeline.yaml (245 lines of code) (raw):

# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. --- resources: ~ dag: airflow_version: 2 initialize: dag_id: copy_files_rolling_basis default_args: owner: "Google" depends_on_past: False start_date: '2021-06-01' max_active_runs: 1 schedule_interval: "0 2 * * *" # Daily at 2am UTC catchup: False default_view: "graph" tasks: - operator: "GKECreateClusterOperator" args: task_id: "create_cluster" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" body: name: geos-fp--copy-files-rolling-basis initial_node_count: 2 network: "{{ var.value.vpc_network }}" ip_allocation_policy: cluster_ipv4_cidr_block: "/26" node_config: machine_type: e2-standard-8 oauth_scopes: - https://www.googleapis.com/auth/devstorage.read_write - https://www.googleapis.com/auth/cloud-platform - operator: "GKEStartPodOperator" description: "Copy files to GCS on the specified date" args: task_id: "copy_files_dated_today" name: "geosfp" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: geos-fp--copy-files-rolling-basis namespace: "default" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" TODAY_DIFF: "0" DOWNLOAD_DIR: "/geos_fp/data" TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" BATCH_SIZE: "10" resources: request_memory: "1G" request_cpu: "1" retries: 3 retry_delay: 300 retry_exponential_backoff: true startup_timeout_seconds: 600 - operator: "GKEStartPodOperator" description: "Copy files to GCS on the specified date" args: task_id: "copy_files_dated_today_minus_1_day" name: "geosfp" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: geos-fp--copy-files-rolling-basis namespace: "default" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" TODAY_DIFF: "1" DOWNLOAD_DIR: "/geos_fp/data" TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" BATCH_SIZE: "10" resources: request_memory: "1G" request_cpu: "1" retries: 3 retry_delay: 300 retry_exponential_backoff: true startup_timeout_seconds: 600 - operator: "GKEStartPodOperator" description: "Copy files to GCS on the specified date" args: task_id: "copy_files_dated_today_minus_2_days" name: "geosfp" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: geos-fp--copy-files-rolling-basis namespace: "default" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" TODAY_DIFF: "2" DOWNLOAD_DIR: "/geos_fp/data" TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" BATCH_SIZE: "10" resources: request_memory: "1G" request_cpu: "1" retries: 3 retry_delay: 300 retry_exponential_backoff: true startup_timeout_seconds: 600 - operator: "GKEStartPodOperator" description: "Copy files to GCS on a 10-day rolling basis" args: task_id: "copy_files_dated_today_minus_3_days" name: "geosfp" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: geos-fp--copy-files-rolling-basis namespace: "default" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" TODAY_DIFF: "3" DOWNLOAD_DIR: "/geos_fp/data" TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" BATCH_SIZE: "10" resources: request_memory: "1G" request_cpu: "1" retries: 3 retry_delay: 300 retry_exponential_backoff: true startup_timeout_seconds: 600 - operator: "GKEStartPodOperator" description: "Copy files to GCS on a 10-day rolling basis" args: task_id: "copy_files_dated_today_minus_4_days" name: "geosfp" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: geos-fp--copy-files-rolling-basis namespace: "default" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" TODAY_DIFF: "4" DOWNLOAD_DIR: "/geos_fp/data" TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" BATCH_SIZE: "10" resources: request_memory: "1G" request_cpu: "1" retries: 3 retry_delay: 300 retry_exponential_backoff: true startup_timeout_seconds: 600 - operator: "GKEStartPodOperator" description: "Copy files to GCS on a 10-day rolling basis" args: task_id: "copy_files_dated_today_minus_5_days" name: "geosfp" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: geos-fp--copy-files-rolling-basis namespace: "default" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" TODAY_DIFF: "5" DOWNLOAD_DIR: "/geos_fp/data" TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" BATCH_SIZE: "10" resources: request_memory: "1G" request_cpu: "1" retries: 3 retry_delay: 300 retry_exponential_backoff: true startup_timeout_seconds: 600 - operator: "GKEStartPodOperator" description: "Copy files to GCS on a 10-day rolling basis" args: task_id: "copy_files_dated_today_minus_6_days" name: "geosfp" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: geos-fp--copy-files-rolling-basis namespace: "default" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" TODAY_DIFF: "6" DOWNLOAD_DIR: "/geos_fp/data" TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" BATCH_SIZE: "10" resources: request_memory: "1G" request_cpu: "1" retries: 3 retry_delay: 300 retry_exponential_backoff: true startup_timeout_seconds: 600 - operator: "GKEStartPodOperator" description: "Copy files to GCS on a 10-day rolling basis" args: task_id: "copy_files_dated_today_minus_7_days" name: "geosfp" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: geos-fp--copy-files-rolling-basis namespace: "default" image: "{{ var.json.geos_fp.container_registry.rolling_copy }}" image_pull_policy: "Always" env_vars: BASE_URL: "https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das" TODAY_DIFF: "7" DOWNLOAD_DIR: "/geos_fp/data" TARGET_BUCKET: "{{ var.json.geos_fp.destination_bucket }}" BATCH_SIZE: "10" resources: request_memory: "1G" request_cpu: "1" retries: 3 retry_delay: 300 retry_exponential_backoff: true startup_timeout_seconds: 600 - operator: "GoogleCloudStorageDeleteOperator" description: "Deletes GCS data more than 7 days ago" args: task_id: "delete_old_data" bucket_name: "{{ var.json.geos_fp.destination_bucket }}" prefix: "{{ macros.ds_format(macros.ds_add(ds, -8), '%Y-%m-%d', 'Y%Y/M%m/D%d') }}" - operator: "GKEDeleteClusterOperator" args: task_id: "delete_cluster" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" name: geos-fp--copy-files-rolling-basis graph_paths: - "delete_old_data" - "create_cluster >> copy_files_dated_today >> delete_cluster" - "create_cluster >> copy_files_dated_today_minus_1_day >> delete_cluster" - "create_cluster >> copy_files_dated_today_minus_2_days >> delete_cluster" - "create_cluster >> copy_files_dated_today_minus_3_days >> delete_cluster" - "create_cluster >> copy_files_dated_today_minus_4_days >> delete_cluster" - "create_cluster >> copy_files_dated_today_minus_5_days >> delete_cluster" - "create_cluster >> copy_files_dated_today_minus_6_days >> delete_cluster" - "create_cluster >> copy_files_dated_today_minus_7_days >> delete_cluster"