pipeline/activities/writeToBlob.py (36 lines of code) (raw):
import azure.durable_functions as df
import logging
from pipelineUtils.blob_functions import list_blobs, get_blob_content, write_to_blob
import os
from configuration import Configuration
config = Configuration()
NEXT_STAGE = config.get_value("NEXT_STAGE")
name = "writeToBlob"
bp = df.Blueprint()
@bp.function_name(name)
@bp.activity_trigger(input_name="args")
def extract_text_from_blob(args: dict):
"""
Writes the JSON bytes to a blob storage.
Args:
args (dict): A dictionary containing the blob name and JSON bytes.
"""
try:
args['json_bytes'] = args['json_str'].encode('utf-8')
sourcefile = os.path.splitext(os.path.basename(args['blob_name']))[0]
result = write_to_blob(NEXT_STAGE, f"{sourcefile}-output.json", args['json_bytes'])
if result:
logging.info(f"Successfully wrote output to blob {args['blob_name']}")
return {
"success": True,
"blob_name": args['blob_name'],
"output_blob": f"{sourcefile}-output.json"
}
else:
logging.error(f"Failed to write output to blob {args['blob_name']}")
return {
"success": False,
"error": "Failed to write output"
}
except Exception as e:
error_msg = f"Error writing output for blob {args['blob_name']}: {str(e)}"
logging.error(error_msg)
return {
"success": False,
"error": error_msg
}