in connectors/blob.py [0:0]
def __init__(self, blob_url, credential=None):
"""
Initialize BlobClient with a specific blob URL.
:param blob_url: URL of the blob (e.g., "https://mystorage.blob.core.windows.net/mycontainer/myblob.png")
:param credential: Credential for authentication (optional)
"""
# 1. Generate the credential in case it is not provided
self.credential = self._get_credential(credential)
self.file_url = blob_url
self.blob_service_client = None
# 2. Parse the blob URL => account_url, container_name, blob_name
try:
parsed_url = urlparse(self.file_url)
self.account_url = f"{parsed_url.scheme}://{parsed_url.netloc}" # e.g. https://mystorage.blob.core.windows.net
self.container_name = parsed_url.path.split("/")[1] # e.g. 'mycontainer'
# Blob name is everything after "/{container_name}/"
self.blob_name = unquote(parsed_url.path[len(f"/{self.container_name}/"):])
logging.debug(f"[blob][{self.blob_name}] Parsed blob URL successfully.")
except Exception as e:
logging.error(f"[blob] Invalid blob URL '{self.file_url}': {e}")
raise EnvironmentError(f"Invalid blob URL '{self.file_url}': {e}")
# 3. Initialize the BlobServiceClient
try:
self.blob_service_client = BlobServiceClient(
account_url=self.account_url,
credential=self.credential
)
logging.debug(f"[blob][{self.blob_name}] Initialized BlobServiceClient.")
except Exception as e:
logging.error(f"[blob][{self.blob_name}] Failed to initialize BlobServiceClient: {e}")
raise