SamplesV2/AzureBatchIntegration/src/decompressor.py (87 lines of code) (raw):

import logging import time import sys import json import asyncio import tarfile import os import errno import shutil from pathlib import Path from aiofile import async_open from azure.storage.blob.aio import BlobServiceClient class Decompressor(): def __init__(self, conn_str): self.conn_str = conn_str async def process_blob_files(self, src_container, dest_container, local_temp_dir): try: self.init(local_temp_dir) # Create the BlobServiceClient object which will be used to create a container client blob_service_client = BlobServiceClient.from_connection_string(self.conn_str) async with blob_service_client: container_client = blob_service_client.get_container_client(src_container) tasks = [] async for blob in container_client.list_blobs(): blob_client = container_client.get_blob_client(blob.name) local_file = "{}/{}".format(local_temp_dir, os.path.basename(blob.name)) tasks.append(self.process_blob_file(blob_client, dest_container, local_file)) await asyncio.gather(*tasks) self.cleanup(local_temp_dir) except Exception as e: print(repr(e)) sys.exit(1) async def process_blob_file(self, blob_client, dest_container, local_file): download_start = time.time() logging.info(f"start downloading file: {local_file}") await self.download_compressed_file(blob_client, local_file) download_end = time.time() logging.info(f"end downloading file: {local_file}, takes {download_end - download_start}") decompress_start = time.time() logging.info(f"start decompressing file: {local_file}") decompression_dir = self.decompress(local_file) decompress_end = time.time() logging.info(f"end decompressing file: {local_file}, takes {decompress_end - decompress_start}") upload_start = time.time() logging.info(f"start uploading files for: {local_file}") properties = await blob_client.get_blob_properties() metadata = properties.metadata metadata_json = json.dumps(metadata) await self.upload_decompressed_files(dest_container, decompression_dir, metadata_json) upload_end = time.time() logging.info(f"end uploading files for: {local_file}, takes {upload_end - upload_start}") async def download_compressed_file(self, blob_client, local_file): async with async_open(local_file, 'wb') as afp: download_stream = await blob_client.download_blob() await afp.write(await download_stream.readall()) def get_files_to_upload(self, decompression_dir): files = [] # r=root, d=directories, f = files for r, d, f in os.walk(decompression_dir): for file in f: files.append(os.path.join(r, file)) return files async def upload_decompressed_files(self, dest_container, decompression_dir, metadata_json): blob_service_client = BlobServiceClient.from_connection_string(self.conn_str) files_to_upload = self.get_files_to_upload(decompression_dir) async with blob_service_client: for file in files_to_upload: await self.upload_decompressed_file(blob_service_client, file, dest_container, metadata_json) async def upload_decompressed_file(self, blob_service_client, file, dest_container, metadata_json): blob_name = os.path.basename(file) blob_client = blob_service_client.get_blob_client(container=dest_container, blob=blob_name) async with async_open(file, "rb") as afp: await blob_client.upload_blob(await afp.read(), overwrite=True) def decompress(self, downloaded_file): decompression_dir = os.path.splitext(downloaded_file)[0] # decompress tar = tarfile.open(downloaded_file) tar.extractall(decompression_dir) tar.close() return decompression_dir # setup temporary directory for process def init(self, local_temp_dir): if os.path.exists(local_temp_dir): shutil.rmtree(local_temp_dir) try: os.makedirs(local_temp_dir) except OSError as exc: # Guard against race condition if exc.errno != errno.EEXIST: raise def cleanup(self, local_temp_dir): shutil.rmtree(local_temp_dir)