in pipeline/review/fn-start-review/main.py [0:0]
def handler(event, context):
try:
task_token = event["TaskToken"]
model_result = event["ModelResult"]
task_object = event["TaskObject"]
if isinstance(task_object, dict):
if "S3Uri" in task_object and task_object["S3Uri"]:
task_object = task_object["S3Uri"]
elif "Bucket" in task_object and "Key" in task_object:
task_object = f"s3://{task_object['Bucket']}/{task_object['Key']}"
else:
raise MalformedRequest(
"TaskObject must be an s3://... URI string OR an object with 'S3Uri' key or "
f"both 'Bucket' and 'Key' keys. Got {task_object}"
)
task_input = {
"TaskObject": task_object,
"TaskToken": task_token, # Not used within A2I, but for feed-through to callback fn
"ModelResult": model_result,
}
if "FlowDefinitionArn" in event:
flow_definition_arn = event["FlowDefinitionArn"]
elif default_flow_definition_arn_param:
flow_definition_arn = ssm.get_parameter(Name=default_flow_definition_arn_param,)[
"Parameter"
]["Value"]
if (not flow_definition_arn) or flow_definition_arn.lower() in ("undefined", "null"):
raise MalformedRequest(
"Neither request FlowDefinitionArn nor expected SSM parameter are set. Got: "
f"{default_flow_definition_arn_param} = '{flow_definition_arn}'"
)
else:
raise MalformedRequest(
"FlowDefinitionArn not specified in request and DEFAULT_FLOW_DEFINITION_ARN_PARAM "
"env var not set"
)
except KeyError as ke:
raise MalformedRequest(f"Missing field {ke}, please check your input payload")
logger.info(f"Starting A2I human loop with input {task_input}")
a2i_response = a2i.start_human_loop(
HumanLoopName=generate_human_loop_name(task_input["TaskObject"]),
FlowDefinitionArn=flow_definition_arn,
HumanLoopInput={"InputContent": json.dumps(task_input)},
# If adapting this code for use with A2I public workforce, you may need to add additional
# content classifiers as described here:
# https://docs.aws.amazon.com/sagemaker/latest/dg/sms-workforce-management-public.html
# https://docs.aws.amazon.com/augmented-ai/2019-11-07/APIReference/API_HumanLoopDataAttributes.html
# DataAttributes={
# "ContentClassifiers": ["FreeOfPersonallyIdentifiableInformation"]
# }
)
logger.info(f"Human loop started: {a2i_response}")
# Doesn't really matter what we return because Step Functions will wait for the callback with
# the token!
return a2i_response["HumanLoopArn"]