datasets/geos_fp/pipelines/_images/rolling_copy/script.py (153 lines of code) (raw):

# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import os import pathlib import subprocess import typing from datetime import date, timedelta import bs4 import requests from google.cloud import storage # The manifest file contains a list of files already downloaded for a given date MANIFEST_FILE = "manifest.txt" def main( base_url: str, dt: date, download_dir: pathlib.Path, target_bucket: str, batch_size: int, ) -> None: # Get date prefix, e.g. Y2021/M01/D01, and create directories for them date_prefix = _date_prefix(dt) (download_dir / date_prefix).mkdir(parents=True, exist_ok=True) # Generate a set of all .nc4 files from the specified url and date all_files = get_all_files(base_url, date_prefix) stored_files = get_stored_files(target_bucket, date_prefix, download_dir) # Files present in the source webpage but not yet stored on GCS unstored_files = all_files - stored_files download_and_store_new_files( download_dir, date_prefix, unstored_files, batch_size, target_bucket ) def _date_prefix(dt: date) -> str: # Generates URL paths to folders containing the .nc4 files, for example # https://portal.nccs.nasa.gov/datashare/gmao/geos-fp/das/Y2021/M01/D01/ # => Y2021/M01/D01 return f"Y{dt.year}/M{dt.month:0>2}/D{dt.day:0>2}" def get_all_files(base_url: str, date_prefix: str) -> typing.Set[str]: all_files = set() url = f"{base_url}/{date_prefix}" response = requests.get(url) if response.status_code == 200: logging.info(f"Scraping .nc4 files in {url}") webpage = bs4.BeautifulSoup(response.text, "html.parser") all_files.update(scrape(date_prefix, webpage)) else: logging.warning(f"The following URL doesn't exist, will try again later: {url}") return all_files def get_stored_files( bucket_name: str, date_prefix: str, download_dir: pathlib.Path ) -> typing.Set[str]: storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) gcs_object = f"{date_prefix}/{MANIFEST_FILE}" local_object = download_dir / MANIFEST_FILE if storage.Blob(bucket=bucket, name=gcs_object).exists(storage_client): logging.info(f"Manifest file found at gs://{bucket_name}/{gcs_object}") blob = bucket.blob(gcs_object) blob.download_to_filename(str(local_object)) else: local_object.touch() with open(local_object) as f: return set(f.read().splitlines()) def scrape(source_path: str, webpage: bs4.BeautifulSoup) -> typing.List[str]: file_paths = [] # Go through all the URLs in the page and collect the ones ending in ".nc4" for a_tag in webpage.find_all("a"): # The `href` property is the filename, # e.g. GEOS.fp.asm.inst1_2d_smp_Nx.20210101_1700.V01.nc4 if a_tag.get("href") and a_tag["href"].endswith(".nc4"): file_paths.append(f"{source_path}/{a_tag['href']}") return file_paths def download_and_store_new_files( download_dir: pathlib.Path, date_prefix: str, new_files: typing.Set[str], batch_size: int, target_bucket: str, ) -> None: """In batches, download files from the source to the local filesystem and upload them to the GCS target bucket """ total_files = len(new_files) logging.info(f"Downloading {total_files} files.") for n, batch in enumerate(batches(list(new_files), batch_size=batch_size), 1): logging.info( f"Processing batch {n}: {(n - 1) * batch_size + 1} to {min(total_files, n * batch_size)}" ) download_batch(batch, download_dir) move_dir_contents_to_gcs(download_dir, target_bucket, date_prefix) update_manifest_file(batch, download_dir, target_bucket, date_prefix) def download_batch(batch: typing.List[str], download_dir: pathlib.Path) -> None: for file_path in batch: logging.info(f"Downloading file to {download_dir}/{file_path}") subprocess.check_call( [ "wget", f"{os.environ['BASE_URL']}/{file_path}", "-O", f"{download_dir}/{file_path}", "-nv", ] ) def move_dir_contents_to_gcs( dir_: pathlib.Path, target_bucket: str, date_prefix: str ) -> None: subprocess.check_call( [ "gsutil", "-m", "-o", "GSUtil:parallel_composite_upload_threshold=250M", "cp", f"{dir_}/{date_prefix}/*.nc4", f"gs://{target_bucket}/{date_prefix}", ] ) delete_temp_pcu_objects(target_bucket) delete_dir_contents(dir_ / date_prefix) def delete_dir_contents(dir_to_delete: pathlib.Path) -> None: """Delete directory contents, but not the dir itself. This is useful for keeping date dirs such as Y2021/M07/D12 intact for the next batch of files to use. """ [f.unlink() for f in dir_to_delete.glob("*") if f.is_file()] def delete_temp_pcu_objects(target_bucket: str) -> None: """Delete temp GCS objects created by gsutil's parallel composite uploads. See https://cloud.google.com/storage/docs/uploads-downloads#gsutil-pcu """ res = subprocess.run( ["gsutil", "ls", f"gs://{target_bucket}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) uris = res.stdout.split() for uri in uris: object_name = uri.split(target_bucket + "/")[-1] if not object_name.startswith("Y"): subprocess.check_call( ["gsutil", "rm", "-r", f"gs://{target_bucket}/{object_name}"], ) def update_manifest_file( paths: typing.List[str], download_dir: pathlib.Path, target_bucket: str, date_prefix: str, ) -> None: manifest_path = download_dir / MANIFEST_FILE with open(manifest_path, "a") as f: f.write("\n".join(paths)) f.write("\n") subprocess.check_call( [ "gsutil", "cp", str(manifest_path), f"gs://{target_bucket}/{date_prefix}/{MANIFEST_FILE}", ] ) def batches(file_paths: typing.List[str], batch_size: int): for i in range(0, len(file_paths), batch_size): yield file_paths[i : i + batch_size] if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) assert os.environ["BASE_URL"] assert os.environ["TODAY_DIFF"] assert os.environ["DOWNLOAD_DIR"] assert os.environ["TARGET_BUCKET"] main( base_url=os.environ["BASE_URL"], dt=(date.today() - timedelta(days=int(os.environ["TODAY_DIFF"]))), download_dir=pathlib.Path(os.environ["DOWNLOAD_DIR"]).expanduser(), target_bucket=os.environ["TARGET_BUCKET"], batch_size=int(os.getenv("BATCH_SIZE", 10)), )