backend/bms_app/services/gcs.py (112 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from datetime import datetime
import google.auth
import google.auth.transport.requests as tr_requests
from google.cloud import storage
from google.resumable_media.requests import ResumableUpload
from bms_app import settings
READ_WRITE_SCOPE = 'https://www.googleapis.com/auth/devstorage.read_write'
URL_TEMPLATE = (
'https://www.googleapis.com/upload/storage/v1/b/{bucket}/o'
'?&uploadType=resumable'
)
CHUNK_SIZE = 1024 * 1024 # 1MB
CONTENT_TYPE = 'application/octet-stream'
logger = logging.getLogger(__name__)
def parse_gcs_uri(gcs_full_path):
"""Extrtact bucket name and file path form the full url.
Return: (bucket_name, file_path)
Examples:
gs://some-bucket/path/file.py -> ('some-bucket', 'path/file.py')
some-bucket/path/file.py -> ('some-bucket', 'path/file.py')
"""
if gcs_full_path.startswith('gs://'):
gcs_full_path = gcs_full_path[5:]
splited = [x for x in gcs_full_path.split('/') if x]
bucket_name = splited[0]
file_path = os.path.join(*splited[1:])
return (bucket_name, file_path)
def upload_stream_to_gcs(stream, bucket_name, key):
"""Upload stream object to GCS (bucket)."""
credentials, _ = google.auth.default(scopes=(READ_WRITE_SCOPE,))
transport = tr_requests.AuthorizedSession(credentials)
upload_url = URL_TEMPLATE.format(bucket=bucket_name)
upload = ResumableUpload(upload_url, CHUNK_SIZE)
metadata = {'name': key}
response = upload.initiate(
transport, stream, metadata,
CONTENT_TYPE, stream_final=False
)
response = upload.transmit_next_chunk(transport)
while response.status_code != 200:
response = upload.transmit_next_chunk(transport)
return response.json()
def upload_blob(bucket_name, key, source_file):
"""Uploads a file to the bucket."""
logger.debug(
'upload blob bucket:%s key:%s' % (bucket_name, key)
)
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(key)
blob.upload_from_file(source_file)
def upload_blob_from_string(bucket_name, key, content):
"""Uploads a file to the bucket."""
logger.debug(
'upload blob from string bucket:%s key:%s' % (bucket_name, key)
)
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(key)
blob.upload_from_string(content)
def list_blobs(bucket_name, prefix=None):
"""List of the interest blobs in the bucket.
Keyword arguments:
bucket_name -- name of bucket
prefix -- name of folder/subfolder. For example "ora-binaries/". Default=None
"""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
list_blobs = bucket.list_blobs(prefix=prefix)
date_format = "%Y-%m-%d %H:%M:%S"
blobs = []
for blob in list_blobs:
if not blob.name.endswith('/'): # only files
blobs.append({
'name': blob.name,
'size': blob.size,
'type': blob.content_type,
'created_date': datetime.fromtimestamp(blob.generation / 10**6).strftime(date_format),
'last_modified': blob.updated.strftime(date_format)
})
return blobs
def get_file_content(bucket_name, file_path):
"""File content from the bucket.
Keyword arguments:
bucket_name -- name of bucket
prefix -- name of folder/subfolder. For example "ora-binaries/"
file_name -- name of the file. For example "pfile.ora"
"""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.get_blob(file_path)
data_string = blob.download_as_string()
# convert bytes to unicode
content = data_string.decode()
return content
def delete_blob(bucket_name, blob_name):
"""Delete a blob from the bucket"""
# bucket_name = "your-bucket-name"
# blob_name = "your-object-name"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
if blob.exists():
blob.delete()
def copy_blob(bucket_name,
blob_name,
destination_bucket_name,
destination_blob_name):
"""Copies a blob from one bucket to another with a new name."""
storage_client = storage.Client()
source_bucket = storage_client.bucket(bucket_name)
source_blob = source_bucket.blob(blob_name)
destination_bucket = storage_client.bucket(destination_bucket_name)
blob_copy = source_bucket.copy_blob(
source_blob, destination_bucket, destination_blob_name
)
logger.debug(
'copy blob from %s/%s to %s/%s' % (
bucket_name, blob_name,
destination_bucket_name, destination_blob_name
)
)
return blob_copy
def create_file_link(gcs_key):
"""Create link to GCS"""
file_link = os.path.join(
'https://console.cloud.google.com/storage/browser/_details',
settings.GCS_BUCKET,
gcs_key
)
return file_link
def blob_exists(bucket_name, key):
"""Check the existence of a file in the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
exist = storage.Blob(bucket=bucket, name=key).exists(storage_client)
return exist