in pipeline/ocr/fn-call-textract/main.py [0:0]
def handle_request(event, context):
try:
srcbucket = event["Input"]["Bucket"]
srckey = event["Input"]["Key"]
task_token = event["TaskToken"]
textract_features = event["Output"].get("Features", default_textract_features)
output_type = event["Output"].get("Type", "s3")
output_type_lower = output_type.lower()
if output_type_lower == "inline":
destbucket = None
destkey = None
pass # No other config to collect
elif output_type_lower == "s3":
destbucket = event["Output"].get("Bucket", srcbucket)
if event["Output"].get("Key"):
destkey = event["Output"]["Key"]
else:
prefix = event["Output"].get("Prefix", "")
if prefix and not prefix.endswith("/"):
prefix += "/"
destkey = "".join([prefix, srckey, ".textract.json"])
else:
raise MalformedRequest(
f"Unknown output integration type '{output_type}': Expected 'Inline' or 'S3'"
)
except KeyError as ke:
raise MalformedRequest(f"Missing field {ke}, please check your input payload")
input_doc = {
"S3Object": {
"Bucket": srcbucket,
"Name": srckey,
},
}
if is_textract_sync:
if len(textract_features) or force_analyze_apis:
result = textract.analyze_document(
Document=input_doc,
FeatureTypes=textract_features,
)
else:
result = textract.detect_document_text(
Document=input_doc,
)
return send_result(
textract_result=result,
sfn_task_token=task_token,
dest_bucket=destbucket,
dest_key=destkey,
)
else:
# ClientRequestToken allows idempotency in case of retries - but only up to 64 chars (so
# we can't use the TaskToken). Different use cases might have different idempotency needs,
# but remember that Textract won't re-trigger an SNS notification for a job it remembers.
# By default we'll enforce idempotency by source location + target location (or current
# timestamp if target location not provided), and provide an additional `IdempotencySalt`
# escape hatch in case users need to force re-processing:
urihash = hashlib.sha256()
urihash.update(f"s3://{srcbucket}/{srckey}".encode("utf-8"))
if destbucket:
urihash.update(f"s3://{destbucket}/{destkey}".encode("utf-8"))
if "IdempotencySalt" in event:
urihash.update(event["IdempotencySalt"].encode("utf-8"))
else:
if "IdempotencySalt" in event:
urihash.update(event["IdempotencySalt"].encode("utf-8"))
else:
urihash.update(f"{time.time()}".encode("utf-8"))
# TODO: Any way we could catch an idempotency issue and pro-actively return result?
start_params = {
"DocumentLocation": input_doc,
"ClientRequestToken": urihash.hexdigest(),
# "JobTag": "Howdy",
"NotificationChannel": {
"RoleArn": callback_sns_role_arn,
"SNSTopicArn": callback_sns_topic_arn,
},
}
logger.info("Textract params: %s (features = %s)", start_params, textract_features)
if len(textract_features) or force_analyze_apis:
job = textract.start_document_analysis(
FeatureTypes=textract_features,
**start_params,
)
else:
job = textract.start_document_text_detection(**start_params)
logger.info(
"Started async job %s for s3://%s/%s\n%s",
job.get("JobId"),
srcbucket,
srckey,
job,
)
cache_data = {
"TextractJobId": job["JobId"],
"SFnTaskToken": task_token,
"OutputType": output_type_lower,
}
if STATE_CACHE_TTL_SECS:
cache_data["ExpiresAt"] = int(time.time()) + STATE_CACHE_TTL_SECS
if output_type_lower == "s3":
cache_data["OutputS3Bucket"] = destbucket
cache_data["OutputS3Key"] = destkey
ddb_state_cache_table.put_item(
Item=cache_data,
ReturnValues="NONE",
)
return cache_data