pipeline/function_app.py (58 lines of code) (raw):
import azure.functions as func
import azure.durable_functions as df
from activities import getBlobContent, runDocIntel, callAoai, writeToBlob
app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
import logging
# An HTTP-triggered function with a Durable Functions client binding
@app.route(route="orchestrators/{functionName}")
@app.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
"""
Starts a new orchestration instance and returns a response to the client.
args:
req (func.HttpRequest): The HTTP request object. Contains an array of JSONs with fields: name, url, and container
client (DurableOrchestrationClient): The Durable Functions client.
response:
func.HttpResponse: The HTTP response object.
"""
body = req.get_json()
logging.info(f"Request body: {body}")
blobs = body.get("blobs", [])
# Validate the blobs array
if not blobs or not isinstance(blobs, list):
return func.HttpResponse(
"Invalid request: 'blobs' must be a non-empty array.",
status_code=400
)
function_name = req.route_params.get('functionName')
instance_id = await client.start_new(function_name, client_input=blobs)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
response = client.create_check_status_response(req, instance_id)
return response
# Orchestrator
@app.function_name(name="orchestrator")
@app.orchestration_trigger(context_name="context")
def run(context):
input_data = context.get_input()
logging.info(f"Context {context}")
logging.info(f"Input data: {input_data}")
sub_tasks = []
for blob in input_data:
logging.info(f"Calling sub orchestrator for blob: {blob}")
sub_tasks.append(context.call_sub_orchestrator("ProcessBlob", blob))
logging.info(f"Sub tasks: {sub_tasks}")
# Runs a list of asynchronous tasks in parallel and waits for all of them to complete. In this case, the tasks are sub-orchestrations that process each blob in parallel
results = yield context.task_all(sub_tasks)
logging.info(f"Results: {results}")
return results
#Sub orchestrator
@app.function_name(name="ProcessBlob")
@app.orchestration_trigger(context_name="context")
def process_blob(context):
blob = context.get_input()
logging.info(f"Process Blob sub Orchestration - Processing blob: {blob}")
# Waits for the result of an activity function that retrieves the blob content
text_result = yield context.call_activity("runDocIntel", blob)
json_str = yield context.call_activity("callAoai", text_result)
task_result = yield context.call_activity(
"writeToBlob",
{
"json_str": json_str,
"blob_name": blob["name"]
}
)
return {
"blob": blob,
"text_result": text_result,
"task_result": task_result
}
app.register_functions(getBlobContent.bp)
app.register_functions(runDocIntel.bp)
app.register_functions(callAoai.bp)
app.register_functions(writeToBlob.bp)