datasets/mnist/pipelines/mnist/mnist_dag.py (138 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from airflow import DAG from airflow.providers.google.cloud.operators import kubernetes_engine default_args = { "owner": "Google", "depends_on_past": False, "start_date": "2022-06-10", } with DAG( dag_id="mnist.mnist", default_args=default_args, max_active_runs=1, schedule_interval="@weekly", catchup=False, default_view="graph", ) as dag: create_cluster = kubernetes_engine.GKECreateClusterOperator( task_id="create_cluster", project_id="{{ var.value.gcp_project }}", location="us-central1-c", body={ "name": "pdp-mnist", "initial_node_count": 1, "network": "{{ var.value.vpc_network }}", "node_config": { "machine_type": "e2-standard-16", "oauth_scopes": [ "https://www.googleapis.com/auth/devstorage.read_write", "https://www.googleapis.com/auth/cloud-platform", ], }, }, ) # Task to copy `t10k-images-idx3-ubyte.gz` from MNIST Database to GCS download_and_process_source_zip_file = kubernetes_engine.GKEStartPodOperator( task_id="download_and_process_source_zip_file", name="mnist", namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", cluster_name="pdp-mnist", image_pull_policy="Always", image="{{ var.json.mnist.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_URL": "http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz", "SOURCE_FILE": "files/t10k-images-idx3-ubyte.gz", "TARGET_FILE": "files/t10k-images-idx3-ubyte.gz", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/mnist/mnist/t10k-images-idx3-ubyte.gz", "PIPELINE_NAME": "mnist", }, container_resources={ "memory": {"request": "16Gi"}, "cpu": {"request": "1"}, "ephemeral-storage": {"request": "10Gi"}, }, ) # Task to copy `train-images-idx3-ubyte.gz` from MNIST Database to GCS download_and_process_source_zip_file_2 = kubernetes_engine.GKEStartPodOperator( task_id="download_and_process_source_zip_file_2", name="mnist", namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", cluster_name="pdp-mnist", image_pull_policy="Always", image="{{ var.json.mnist.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_URL": "http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz", "SOURCE_FILE": "files/train-images-idx3-ubyte.gz", "TARGET_FILE": "files/train-images-idx3-ubyte.gz", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/mnist/mnist/train-images-idx3-ubyte.gz", "PIPELINE_NAME": "mnist", }, container_resources={ "memory": {"request": "16Gi"}, "cpu": {"request": "1"}, "ephemeral-storage": {"request": "10Gi"}, }, ) # Task to copy `train-labels-idx1-ubyte.gz` from MNIST Database to GCS download_and_process_source_zip_file_3 = kubernetes_engine.GKEStartPodOperator( task_id="download_and_process_source_zip_file_3", name="mnist", namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", cluster_name="pdp-mnist", image_pull_policy="Always", image="{{ var.json.mnist.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_URL": "http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz", "SOURCE_FILE": "files/train-labels-idx1-ubyte.gz", "TARGET_FILE": "files/train-labels-idx1-ubyte.gz", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/mnist/mnist/train-labels-idx1-ubyte.gz", "PIPELINE_NAME": "mnist", }, container_resources={ "memory": {"request": "16Gi"}, "cpu": {"request": "1"}, "ephemeral-storage": {"request": "10Gi"}, }, ) # Task to copy `t10k-labels-idx1-ubyte.gz` from MNIST Database to GCS download_and_process_source_zip_file_4 = kubernetes_engine.GKEStartPodOperator( task_id="download_and_process_source_zip_file_4", name="mnist", namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", cluster_name="pdp-mnist", image_pull_policy="Always", image="{{ var.json.mnist.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_URL": "http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz", "SOURCE_FILE": "files/t10k-labels-idx1-ubyte.gz", "TARGET_FILE": "files/t10k-labels-idx1-ubyte.gz", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/mnist/mnist/t10k-labels-idx1-ubyte.gz", "PIPELINE_NAME": "mnist", }, container_resources={ "memory": {"request": "16Gi"}, "cpu": {"request": "1"}, "ephemeral-storage": {"request": "10Gi"}, }, ) delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( task_id="delete_cluster", project_id="{{ var.value.gcp_project }}", location="us-central1-c", name="pdp-mnist", ) ( create_cluster >> download_and_process_source_zip_file >> download_and_process_source_zip_file_2 >> download_and_process_source_zip_file_3 >> download_and_process_source_zip_file_4 >> delete_cluster )