src/dfcx_scrapi/tools/gcs_utils.py (68 lines of code) (raw):
"""Utility and helper methods for using Google Cloud Storage."""
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from google.api_core.exceptions import NotFound
from google.cloud import storage
class GcsUtils:
def __init__(self, gcs_path: str = None):
self.client = storage.Client()
if gcs_path:
self.gcs_path = gcs_path
self.bucket_name, self.file_path = self.get_bucket_name_and_path(
gcs_path
)
self.bucket_exists(self.bucket_name)
@staticmethod
def get_bucket_name_and_path(gcs_path: str):
"""Strip gs:// or extract proper bucket name."""
_, bucket_and_path = gcs_path.split("gs://")
bucket_name, file_path = bucket_and_path.split("/", 1)
return bucket_name, file_path
def get_fully_qualified_path(self, filename: str):
"""Get the fully qualified path of the filename."""
if self.file_path:
return f"{self.file_path}/{filename}"
else:
return filename
def read_file(self, filepath: str):
"""Read a file from GCS bucket."""
# 1. Validate the file path
if not filepath.startswith("gs://"):
raise ValueError("Invalid filepath. Must start with 'gs://'")
# 2. Split and extract components
_, gcs_path = filepath.split("gs://")
bucket_name, file_path = self.get_bucket_name_and_path(gcs_path)
# 3. Access Google Cloud Storage
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(file_path)
# 4. Read the file content
file_content = blob.download_as_text()
return file_content
def write_file(
self,
bucket_name: str,
local_file_path: str,
destination_name: str = None):
"""Write a file to GCS bucket."""
# 1. Validate bucket name
if not bucket_name:
raise ValueError("Bucket name cannot be empty")
# 2. Determine destination file path
if destination_name:
file_path = destination_name
else:
file_path = local_file_path.split("/")[-1]
# 3. Access Google Cloud Storage
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(file_path)
# 4. Upload the file
with open(local_file_path, "rb") as f:
blob.upload_from_file(f)
return f"File uploaded to gs://{bucket_name}/{file_path}"
def write_dict_to_gcs(self, bucket_name: str, data: dict, filename: str):
"""Write a dict as a JSON file to a GCS bucket."""
if not bucket_name or not filename:
raise ValueError("Bucket name and filename cannot be empty")
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(filename)
json_string = json.dumps(data, indent=4)
blob.upload_from_string(json_string, content_type="application/json")
def load_file_if_exists(self, bucket_name: str, filename: str):
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(filename)
if blob.exists():
full_path = f"gs://{bucket_name}/{filename}"
return self.read_file(filepath=full_path)
else:
return None
def bucket_exists(self, bucket_name: str):
"""Checks if a GCS bucket exists."""
try:
storage_client = storage.Client()
storage_client.get_bucket(bucket_name)
except NotFound:
raise NotFound(f"GCS Bucket `{bucket_name}` does not exist.")