cloud-composer/dags/step-01-taxi-data-download.py (224 lines of code) (raw):

#################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Download 2019, 2020, 2021 taxi data for NYC and upload to Google Cloud Storage # [START dag] from google.cloud import storage from datetime import datetime, timedelta import requests import sys import os import logging import airflow #from airflow.operators import bash_operator #from airflow.contrib.operators import dataproc_operator from airflow.utils import trigger_rule #from airflow.contrib.operators import bigquery_operator from airflow.operators.python_operator import PythonOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': None, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), 'dagrun_timeout' : timedelta(minutes=60), } project_id = os.environ['ENV_PROJECT_ID'] raw_bucket_name = os.environ['ENV_RAW_BUCKET'] # Download Taxi data from the internet def DownloadFile(url): print ("Begin: DownloadFile") print ("url: ", url) r = requests.get(url, allow_redirects=True) fileName = url[url.rindex('/')+1:] print ("fileName: ", fileName) if r.status_code == requests.codes.ok: open(fileName, 'wb').write(r.content) else: raise ValueError("Could not download file: ", fileName) print ("End: DownloadFile") return fileName # https://cloud.google.com/storage/docs/uploading-objects#prereq-code-samples def upload_blob(project, raw_bucket_name, source_file_name, destination_blob_name): """Uploads a file to the bucket.""" # The ID of your GCS bucket # project = your GCP projec id # raw_bucket_name = "your-bucket-name" # The path to your file to upload # source_file_name = "local/path/to/file" # The ID of your GCS object # destination_blob_name = "storage-object-name" print ("Begin: upload_blob") print ("project: ", project) print ("raw_bucket_name: ", raw_bucket_name) print ("source_file_name: ", source_file_name) print ("destination_blob_name: ", destination_blob_name) storage_client = storage.Client(project=project) bucket = storage_client.bucket(raw_bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_filename(filename=source_file_name) print("File {} uploaded to {}.".format(source_file_name, destination_blob_name)) print("Begin: upload_blob") # python3 download-taxi-data.py "big-query-demo-09" "big-query-demo-09" "yellow" "2021" "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet" def download_and_upload_to_gcs(project, raw_bucket_name, color, year, url, max_month): print ("Begin: download_and_upload_to_gcs") for month_index in range(max_month): downloadURL = url.replace("{COLOR}",color).replace("{YEAR}",year).replace("{MONTH}",str.format("%02d" % (month_index+1,))) print("downloadURL:", downloadURL) try: source_file_name = DownloadFile(downloadURL) destination_blob_name = "raw/taxi-data/" + color + "/" + year + "/" + source_file_name upload_blob(project, raw_bucket_name, source_file_name, destination_blob_name) os.remove(source_file_name) except Exception as e: print(e) print("Skipping file: ", downloadURL) print ("End: download_and_upload_to_gcs") with airflow.DAG('step-01-taxi-data-download', default_args=default_args, start_date=datetime(2021, 1, 1), # Not scheduled, trigger only schedule_interval=None) as dag: download_yellow_2024 = PythonOperator( task_id='download_yellow_2024', python_callable= download_and_upload_to_gcs, op_kwargs = { "project" : project_id, "raw_bucket_name" : raw_bucket_name, "color" : "yellow", "year" : "2024", "url" : "https://d37ci6vzurychx.cloudfront.net/trip-data/{COLOR}_tripdata_{YEAR}-{MONTH}.parquet", "max_month" : 9 }, execution_timeout=timedelta(minutes=30), dag=dag, ) download_yellow_2023 = PythonOperator( task_id='download_yellow_2023', python_callable= download_and_upload_to_gcs, op_kwargs = { "project" : project_id, "raw_bucket_name" : raw_bucket_name, "color" : "yellow", "year" : "2023", "url" : "https://d37ci6vzurychx.cloudfront.net/trip-data/{COLOR}_tripdata_{YEAR}-{MONTH}.parquet", "max_month" : 12 }, execution_timeout=timedelta(minutes=30), dag=dag, ) download_green_2024 = PythonOperator( task_id='download_green_2024', python_callable= download_and_upload_to_gcs, op_kwargs = { "project" : project_id, "raw_bucket_name" : raw_bucket_name, "color" : "green", "year" : "2024", "url" : "https://d37ci6vzurychx.cloudfront.net/trip-data/{COLOR}_tripdata_{YEAR}-{MONTH}.parquet", "max_month" : 9 }, execution_timeout=timedelta(minutes=30), dag=dag, ) download_green_2023 = PythonOperator( task_id='download_green_2023', python_callable= download_and_upload_to_gcs, op_kwargs = { "project" : project_id, "raw_bucket_name" : raw_bucket_name, "color" : "green", "year" : "2023", "url" : "https://d37ci6vzurychx.cloudfront.net/trip-data/{COLOR}_tripdata_{YEAR}-{MONTH}.parquet", "max_month" : 12 }, execution_timeout=timedelta(minutes=30), dag=dag, ) download_yellow_2022 = PythonOperator( task_id='download_yellow_2022', python_callable= download_and_upload_to_gcs, op_kwargs = { "project" : project_id, "raw_bucket_name" : raw_bucket_name, "color" : "yellow", "year" : "2022", "url" : "https://d37ci6vzurychx.cloudfront.net/trip-data/{COLOR}_tripdata_{YEAR}-{MONTH}.parquet", "max_month" : 12 }, execution_timeout=timedelta(minutes=30), dag=dag, ) download_green_2022 = PythonOperator( task_id='download_green_2022', python_callable= download_and_upload_to_gcs, op_kwargs = { "project" : project_id, "raw_bucket_name" : raw_bucket_name, "color" : "green", "year" : "2022", "url" : "https://d37ci6vzurychx.cloudfront.net/trip-data/{COLOR}_tripdata_{YEAR}-{MONTH}.parquet", "max_month" : 12 }, execution_timeout=timedelta(minutes=30), dag=dag, ) download_yellow_2021 = PythonOperator( task_id='download_yellow_2021', python_callable= download_and_upload_to_gcs, op_kwargs = { "project" : project_id, "raw_bucket_name" : raw_bucket_name, "color" : "yellow", "year" : "2021", "url" : "https://d37ci6vzurychx.cloudfront.net/trip-data/{COLOR}_tripdata_{YEAR}-{MONTH}.parquet", "max_month" : 12 }, execution_timeout=timedelta(minutes=30), dag=dag, ) download_green_2021 = PythonOperator( task_id='download_green_2021', python_callable= download_and_upload_to_gcs, op_kwargs = { "project" : project_id, "raw_bucket_name" : raw_bucket_name, "color" : "green", "year" : "2021", "url" : "https://d37ci6vzurychx.cloudfront.net/trip-data/{COLOR}_tripdata_{YEAR}-{MONTH}.parquet", "max_month" : 12 }, execution_timeout=timedelta(minutes=30), dag=dag, ) download_yellow_2020 = PythonOperator( task_id='download_yellow_2020', python_callable= download_and_upload_to_gcs, op_kwargs = { "project" : project_id, "raw_bucket_name" : raw_bucket_name, "color" : "yellow", "year" : "2020", "url" : "https://d37ci6vzurychx.cloudfront.net/trip-data/{COLOR}_tripdata_{YEAR}-{MONTH}.parquet", "max_month" : 12 }, execution_timeout=timedelta(minutes=30), dag=dag, ) download_green_2020 = PythonOperator( task_id='download_green_2020', python_callable= download_and_upload_to_gcs, op_kwargs = { "project" : project_id, "raw_bucket_name" : raw_bucket_name, "color" : "green", "year" : "2020", "url" : "https://d37ci6vzurychx.cloudfront.net/trip-data/{COLOR}_tripdata_{YEAR}-{MONTH}.parquet", "max_month" : 12 }, execution_timeout=timedelta(minutes=30), dag=dag, ) download_yellow_2019 = PythonOperator( task_id='download_yellow_2019', python_callable= download_and_upload_to_gcs, op_kwargs = { "project" : project_id, "raw_bucket_name" : raw_bucket_name, "color" : "yellow", "year" : "2019", "url" : "https://d37ci6vzurychx.cloudfront.net/trip-data/{COLOR}_tripdata_{YEAR}-{MONTH}.parquet", "max_month" : 12 }, execution_timeout=timedelta(minutes=30), dag=dag, ) download_green_2019 = PythonOperator( task_id='download_green_2019', python_callable= download_and_upload_to_gcs, op_kwargs = { "project" : project_id, "raw_bucket_name" : raw_bucket_name, "color" : "green", "year" : "2019", "url" : "https://d37ci6vzurychx.cloudfront.net/trip-data/{COLOR}_tripdata_{YEAR}-{MONTH}.parquet", "max_month" : 12 }, execution_timeout=timedelta(minutes=30), dag=dag, ) # Do not do in parallel since the worker node will run out of disk space # If you had more workers then yes, run in parallel download_green_2024 >> download_green_2023 >> download_yellow_2024 >> download_yellow_2023 >> \ download_green_2022 >> download_yellow_2022 >> \ download_green_2021 >> download_yellow_2021 >> \ download_green_2020 >> download_yellow_2020 >> \ download_green_2019 >> download_yellow_2019 # [END dag]