in demo_app/demo_app.py [0:0]
def blob_form_to_cosmosdb_process(file: str):
start_time = time.time()
try:
# Append a random prefix to the filename to avoid conflicts
random_prefix = str(int(time.time()))
name, ext = os.path.splitext(os.path.basename(file))
blob_name = f"{name}_{random_prefix}{ext}"
# Upload data to blob storage
blob_client = blob_service_client.get_blob_client(
container="blob-form-to-cosmosdb-blobs", blob=blob_name
)
with open(file, "rb") as f:
data = f.read()
blob_client.upload_blob(data)
# Query the CosmosDB container until a result is found
gr.Info(
"File uploaded to Blob Storage. Polling CosmosDB until the result appears..."
)
cosmos_timeout_secs = 30
query = "SELECT * FROM c WHERE c.filename = @filename"
parameters = [{"name": "@filename", "value": blob_name}]
items = None
cosmos_query_timer = time.time()
while not items:
time.sleep(2)
if time.time() - cosmos_query_timer > cosmos_timeout_secs:
# If no record is found, return a placeholder result
timeout_error_text = f"Timeout: No result found in CosmosDB after {cosmos_timeout_secs} seconds."
client_side_time_taken = (
f"{round(time.time() - start_time, 1)} seconds"
)
return (
404,
client_side_time_taken,
{"Error": timeout_error_text},
)
items = list(
blob_form_extraction_to_cosmosdb_container_client.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True,
)
)
client_side_time_taken = f"{round(time.time() - start_time, 1)} seconds"
# Delete the blob from storage
blob_client.delete_blob()
gr.Info("Task complete. Deleting the file from blob storage...")
# Return status code, time taken and the first item from the list (it should only have one item)
return (200, client_side_time_taken, items[0])
except Exception as e:
logging.exception(
"Error occurred during blob upload and CosmosDB query."
)
# Ensure the blob is deleted from storage if an error occurs
try:
blob_client.delete_blob()
gr.Error("Error occurred. Deleting the file from blob storage...")
except Exception as _e:
pass
# Return the result including the error.
client_side_time_taken = f"{round(time.time() - start_time, 1)} seconds"
return (500, client_side_time_taken, {"Error": str(e)})