data_validation/gcs_helper.py (70 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from typing import List
from google.cloud import storage
from data_validation import client_info
WRITE_SUCCESS_STRING = "Success! Config output written to"
DELETE_SUCCESS_STRING = "Successfully deleted"
def _is_gcs_path(file_path: str) -> bool:
return True if file_path.startswith("gs://") else False
def get_validation_path(name: str) -> str:
"""Returns the full path to a validation."""
if _is_gcs_path(name):
return name
return os.path.join("./", name)
def get_gcs_bucket(gcs_file_path: str) -> storage.Bucket:
"""Returns storage.Bucket given GCS file path with prefix."""
bucket_name = gcs_file_path[5:].split("/")[0]
info = client_info.get_http_client_info()
storage_client = storage.Client(client_info=info)
try:
return storage_client.bucket(bucket_name)
except ValueError as e:
raise ValueError("GCS Path Failure {} -> {}".format(gcs_file_path, e))
def _get_gcs_file_path(gcs_file_path: str) -> str:
"""
Returns relative object file path i.e. `path/to/file.yaml`
given full GCS file path with prefix.
"""
return "".join(gcs_file_path[5:].split("/", 1)[1:])
def _read_gcs_file(file_path: str, download_as_text: bool = False):
gcs_bucket = get_gcs_bucket(file_path)
blob = gcs_bucket.blob(_get_gcs_file_path(file_path))
if not blob:
raise ValueError(f"Invalid Cloud Storage Path: {file_path}")
return blob.download_as_text() if download_as_text else blob.download_as_bytes()
def _write_gcs_file(file_path: str, data: str):
gcs_bucket = get_gcs_bucket(file_path)
blob = gcs_bucket.blob(_get_gcs_file_path(file_path))
blob.upload_from_string(data)
def _delete_gcs_file(file_path: str):
"""Delete a file stored in GCS."""
gcs_bucket = get_gcs_bucket(file_path)
blob = gcs_bucket.blob(_get_gcs_file_path(file_path))
blob.delete()
def read_file(file_path: str, download_as_text: bool = False):
if _is_gcs_path(file_path):
return _read_gcs_file(file_path, download_as_text)
else:
with open(file_path, "r") as f:
return f.read()
def write_file(file_path: str, data: str, include_log: bool = True):
if _is_gcs_path(file_path):
_write_gcs_file(file_path, data)
else:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w") as file:
file.write(data)
if include_log:
logging.info(f"{WRITE_SUCCESS_STRING} {file_path}")
def delete_file(file_path: str, include_log: bool = True):
"""Delete a file from GCS or local filesystem, depending on the path."""
if _is_gcs_path(file_path):
_delete_gcs_file(file_path)
else:
if os.path.exists(file_path):
os.remove(file_path)
else:
raise FileNotFoundError(f"File not found: {file_path}")
if include_log:
logging.info(f"{DELETE_SUCCESS_STRING}: {file_path}")
def list_gcs_directory(directory_path: str) -> List[str]:
gcs_prefix = _get_gcs_file_path(directory_path)
gcs_bucket = get_gcs_bucket(directory_path)
blobs = [
f.name.replace(gcs_prefix, "")
for f in gcs_bucket.list_blobs(prefix=gcs_prefix, delimiter="/")
]
return blobs