data_validation/gcs_helper.py (70 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import os from typing import List from google.cloud import storage from data_validation import client_info WRITE_SUCCESS_STRING = "Success! Config output written to" DELETE_SUCCESS_STRING = "Successfully deleted" def _is_gcs_path(file_path: str) -> bool: return True if file_path.startswith("gs://") else False def get_validation_path(name: str) -> str: """Returns the full path to a validation.""" if _is_gcs_path(name): return name return os.path.join("./", name) def get_gcs_bucket(gcs_file_path: str) -> storage.Bucket: """Returns storage.Bucket given GCS file path with prefix.""" bucket_name = gcs_file_path[5:].split("/")[0] info = client_info.get_http_client_info() storage_client = storage.Client(client_info=info) try: return storage_client.bucket(bucket_name) except ValueError as e: raise ValueError("GCS Path Failure {} -> {}".format(gcs_file_path, e)) def _get_gcs_file_path(gcs_file_path: str) -> str: """ Returns relative object file path i.e. `path/to/file.yaml` given full GCS file path with prefix. """ return "".join(gcs_file_path[5:].split("/", 1)[1:]) def _read_gcs_file(file_path: str, download_as_text: bool = False): gcs_bucket = get_gcs_bucket(file_path) blob = gcs_bucket.blob(_get_gcs_file_path(file_path)) if not blob: raise ValueError(f"Invalid Cloud Storage Path: {file_path}") return blob.download_as_text() if download_as_text else blob.download_as_bytes() def _write_gcs_file(file_path: str, data: str): gcs_bucket = get_gcs_bucket(file_path) blob = gcs_bucket.blob(_get_gcs_file_path(file_path)) blob.upload_from_string(data) def _delete_gcs_file(file_path: str): """Delete a file stored in GCS.""" gcs_bucket = get_gcs_bucket(file_path) blob = gcs_bucket.blob(_get_gcs_file_path(file_path)) blob.delete() def read_file(file_path: str, download_as_text: bool = False): if _is_gcs_path(file_path): return _read_gcs_file(file_path, download_as_text) else: with open(file_path, "r") as f: return f.read() def write_file(file_path: str, data: str, include_log: bool = True): if _is_gcs_path(file_path): _write_gcs_file(file_path, data) else: os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "w") as file: file.write(data) if include_log: logging.info(f"{WRITE_SUCCESS_STRING} {file_path}") def delete_file(file_path: str, include_log: bool = True): """Delete a file from GCS or local filesystem, depending on the path.""" if _is_gcs_path(file_path): _delete_gcs_file(file_path) else: if os.path.exists(file_path): os.remove(file_path) else: raise FileNotFoundError(f"File not found: {file_path}") if include_log: logging.info(f"{DELETE_SUCCESS_STRING}: {file_path}") def list_gcs_directory(directory_path: str) -> List[str]: gcs_prefix = _get_gcs_file_path(directory_path) gcs_bucket = get_gcs_bucket(directory_path) blobs = [ f.name.replace(gcs_prefix, "") for f in gcs_bucket.list_blobs(prefix=gcs_prefix, delimiter="/") ] return blobs