decisionai_plugin/common/util/azureblob.py (52 lines of code) (raw):

from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient from azure.core.exceptions import ResourceExistsError from azure.storage.blob import generate_container_sas, generate_blob_sas, BlobSasPermissions from azure.storage.blob import ResourceTypes, AccountSasPermissions, generate_account_sas from azure.identity import DefaultAzureCredential from .constant import AZURE_STORAGE_ACCOUNT_USE_MI from datetime import datetime from datetime import timedelta from telemetry import log class AzureBlob(): def __init__(self, account_name, account_key=None, account_domain="core.windows.net"): if AZURE_STORAGE_ACCOUNT_USE_MI: account_url = "https://{}.blob.{}".format(account_name, account_domain) self.blob_service_client = BlobServiceClient(account_url, DefaultAzureCredential()) else: connect_str = "DefaultEndpointsProtocol=https;AccountName={};AccountKey={};EndpointSuffix={}".format(account_name, account_key, account_domain) # Create the BlobServiceClient object which will be used to create a container client self.blob_service_client = BlobServiceClient.from_connection_string(connect_str) self.account_domain = account_domain def create_container(self, container_name): # Create the container try: self.blob_service_client.create_container(container_name) except ResourceExistsError: log.info("Container %s already exists!" % container_name) def upload_blob(self, container_name, blob_name, data, replace = True): blob_client = self.blob_service_client.get_blob_client(container=container_name, blob=blob_name) log.info("\nUploading to Azure Storage as blob:\n\t" + blob_name) # Upload the created file blob_client.upload_blob(data, overwrite=replace) log.info("Blob %s in container %s has been uploaded/updated!" % (blob_name, container_name)) def list_blob(self, container_name): log.info("\nListing blobs...") # List the blobs in the container blob_list = self.blob_service_client.get_container_client(container_name).list_blobs() blobs = [] for blob in blob_list: blobs.append(blob.name) return blobs def delete_blob(self, container_name, blob_name): log.info("\nDelete blob...") blob_client = self.blob_service_client.get_blob_client(container=container_name, blob=blob_name) blob_client.delete_blob() def download_blob(self, container_name, blob_name, download_file_path): log.info("\nDownloading blob to \n\t" + download_file_path) blob_client = self.blob_service_client.get_blob_client(container=container_name, blob=blob_name) with open(download_file_path, "wb") as download_file: download_file.write(blob_client.download_blob().readall()) def delete_container(self, container_name): log.info("Deleting blob container...") self.blob_service_client.delete_container(container_name) def generate_blob_sas(self, container_name, blob_name): log.info("Generating blob sas...") blob_sas = generate_blob_sas(account_name=self.blob_service_client.account_name, account_key=self.blob_service_client.credential.account_key, container_name=container_name, blob_name=blob_name, permission=BlobSasPermissions(read=True), expiry=datetime.utcnow() + timedelta(days=1)) return 'https://' + self.blob_service_client.account_name +'.blob.{}/'.format(self.account_domain) + container_name + '/' + blob_name + '?' + blob_sas