experiments/arena/scripts/gcs_bulk_uploader.py (145 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import dotenv from pathlib import Path from typing import Dict, List, Union, Optional import logging from google.cloud import storage from google.cloud.storage import transfer_manager from alive_progress import alive_bar import fire from config.default import Default # Load environment variables from .env file dotenv.load_dotenv(override=True) # Initialize the default configuration # This is a singleton class that manages the configuration for the application. config = Default() class GCSUploader: """Singleton class for uploading directories to Google Cloud Storage.""" _instances = {} # Store instances per project/bucket def __new__(cls, bucket_name: str, project_id: Optional[str] = None): key = (bucket_name, project_id) if key not in cls._instances: cls._instances[key] = super(GCSUploader, cls).__new__(cls) cls._instances[key].storage_client = storage.Client(project=project_id) cls._instances[key].bucket = cls._instances[key].storage_client.bucket(bucket_name) cls._instances[key]._setup_logging() cls._instances[key].logger.info(f"Initialized GCSUploader for bucket: {bucket_name}, project: {project_id}") return cls._instances[key] def __init__(self, bucket_name: str, project_id: Optional[str] = None): if not hasattr(self, 'bucket'): self.bucket = self.storage_client.bucket(bucket_name) self.logger.info(f"GCSUploader instance created for bucket: {bucket_name}, project: {project_id}") def _setup_logging(self): self.logger = logging.getLogger(__name__) self.logger.setLevel(logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) self.logger.addHandler(ch) def upload_dir_to_gcs( self, src_dir: str, gcs_destination_directory: str, workers: int = os.cpu_count(), verbose: bool = False, skip_if_exists: bool = False, extensions: Optional[List[str]] = None, ) -> Dict[str, Union[None, Exception]]: """Upload every file in a directory, including all files in subdirectories.""" # Validate the source directory if not os.path.isdir(src_dir): self.logger.error(f"Source directory {src_dir} is not a valid directory.") raise ValueError(f"Source directory {src_dir} is not a valid directory.") # Validate the destination directory if not gcs_destination_directory: self.logger.error("Destination directory cannot be empty.") raise ValueError("Destination directory cannot be empty.") self.logger.info(f"Starting upload from {src_dir} to {self.bucket.name}/{gcs_destination_directory}") if not os.path.exists(src_dir): self.logger.error(f"Directory {src_dir} not found.") raise ValueError(f"Directory {src_dir} is not found.") dir_as_path_objs = Path(src_dir) paths = [ str(path.relative_to(src_dir)) for path in dir_as_path_objs.rglob("*") if path.is_file() and (extensions is None or path.suffix[1:].lower() in extensions) ] self.logger.info(f"Found {len(paths)} files to upload.") if verbose: self._log(f"Found {len(paths)} files in directory: {src_dir}") self._log("Starting upload...") upload_results: Dict[str, Union[None, Exception]] = {} try: with alive_bar(len(paths), title='Uploading...', force_tty=True) as bar: self.logger.info(f"Using {workers} workers for upload.") results = transfer_manager.upload_many_from_filenames( self.bucket, paths, source_directory=src_dir, blob_name_prefix=f"{gcs_destination_directory}/", max_workers=workers, skip_if_exists=skip_if_exists, ) bar() self.logger.info("Upload completed by transfer manager.") for name, result in zip(paths, results): upload_results[name] = result if isinstance(result, Exception): self.logger.error(f"Failed to upload {name} due to exception: {result}") self._log(f"Failed to upload {name} due to exception: {result}", level=logging.ERROR) elif verbose: self.logger.info(f"Uploaded {name} to {self.bucket.name}.") self._log(f"Uploaded {name} to {self.bucket.name}.") except Exception as e: self.logger.error(f"Error during upload: {e}") self._log(f"Error during upload: {e}", level=logging.ERROR) return {"error": e} self.logger.info("Upload process finished.") return upload_results def _log(self, message: str, level: int = logging.INFO): """Internal logging function.""" self.logger.log(level, message) def main( bucket_name: str, source_directory: str, destination_directory: str = "", verbose: bool = False, skip_if_exists: bool = False, extensions: Optional[str] = ".json,png", project_id: Optional[str] = config.PROJECT_ID, ): """ Uploads files from a local directory to a GCS bucket. Args: bucket_name: The GCS bucket name e.g. "my-gcs-bucket" without the "gs://" prefix. source_directory: The local directory path. destination_prefix: Optional GCS destination prefix. verbose: Enable verbose output. skip_if_exists: Skip existing files. extensions: Optional comma-separated file extensions (e.g., "png,json"). project_id: Optional Google Cloud Project ID. """ # Validate destination directory if not destination_directory: destination_directory = os.path.basename(source_directory.rstrip('/\\')) if not destination_directory: raise ValueError("Destination directory cannot be empty.") logging.info(f"Destination directory not provided. Using the base name of the source directory: {destination_directory}") # Validate bucket name does not start with gs:// if bucket_name.startswith("gs://"): logging.info("Bucket name should not start with 'gs://'. Removing 'gs://' prefix.") bucket_name = bucket_name[5:] logging.info(f"Starting main function with bucket: {bucket_name}, source: {source_directory}, dest subfolder: {destination_directory}, project: {project_id}") if extensions: extensions_list = [ext.strip().lower() for ext in extensions.split(',')] else: extensions_list = None uploader = GCSUploader(bucket_name, project_id) try: results = uploader.upload_dir_to_gcs( src_dir=source_directory, gcs_destination_directory=destination_directory, verbose=verbose, skip_if_exists=skip_if_exists, extensions=extensions_list, ) if "error" in results: logging.error("Upload failed. See logs for details.") print("Upload failed. See logs for details.") else: logging.info("Upload completed.") print("Upload completed.") if verbose: for filename, result in results.items(): if isinstance(result, Exception): print(f"Failed: {filename}") logging.error(f"Failed to upload {filename}.") except ValueError as e: logging.error(f"Error: {e}") print(f"Error: {e}") except Exception as e: logging.exception(f"An unexpected error occurred: {e}") print(f"An unexpected error occurred: {e}") logging.info("Main function finished.") if __name__ == "__main__": fire.Fire(main)