in src/pre_human_task_lambda.py [0:0]
def lambda_handler(event, context):
"""
Sample PreHumanTaskLambda (pre-processing lambda) for custom labeling jobs.
For custom AWS SageMaker Ground Truth Labeling Jobs, you have to specify a PreHumanTaskLambda (pre-processing lambda).
AWS SageMaker invokes this lambda for each item to be labeled. Output of this lambda, is merged with the specified
custom UI template. This code assumes that specified custom template have only one placeholder "taskObject".
If your UI template have more parameters, please modify output of this lambda.
Parameters
----------
event: dict, required
Content of event looks some thing like following
{
"version":"2018-10-16",
"labelingJobArn":"<your labeling job ARN>",
"dataObject":{
"source-ref":"s3://<your bucket>/<your keys>/awesome.pdf",
"page": "<page number, if not provided will default to 1>"
"metadata": {
"pages": "<total # of pages in the PDF>",
"use-textract-only": <True or False>,
"labels": <list of label strings>
},
"annotator-metadata": <dictionary defined during job creation>,
"primary-annotation-ref": "<S3 Uri for primary annotation>" or None,
"secondary-annotation-ref": "<S3 Uri for secondary annotation>" or None
}
}
As SageMaker product evolves, content of event object will change. For a latest version refer following URL
Event doc: https://docs.aws.amazon.com/sagemaker/latest/dg/sms-custom-templates-step3.html
context: object, required
Lambda Context runtime methods and attributes
Context doc: https://docs.aws.amazon.com/lambda/latest/dg/python-context-object.html
Returns
-------
output: dict
This output is an example JSON. We assume that your template have only one placeholder named "taskObject".
If your template have more than one placeholder, make sure to add one more attribute under "taskInput"
{
"taskInput":{
"taskObject": {
"pdfBase64S3Ref": <S3 reference to the PDF page's base64 string>,
"pdfBlocksS3Ref": <S3 reference to the PDF page's block objects>,
"pdfType": <NativePDF or ScannedPDF>,
"version": <current date in YYYY-MM-DD format>,
...<other properties in the inputted dataObject>
},
"labels": <list of label strings>
},
"humanAnnotationRequired":"true"
}
Note: Output of this lambda will be merged with the template, you specify in your labeling job.
You can use preview button on SageMaker Ground Truth console to make sure merge is successful.
Return doc: https://docs.aws.amazon.com/sagemaker/latest/dg/sms-custom-templates-step3.html
"""
# Event received
print("Received event: " + json.dumps(event, indent=2))
job_arn = event['labelingJobArn']
job_id = job_arn.split('/')[-1]
print(f'labeling job id = {job_id}')
data_obj = event["dataObject"]
print(f"POPPLER_PATH={POPPLER_PATH} file permission info")
print(f"READ Permission: {os.access(POPPLER_PATH, os.R_OK)}")
print(f"WRITE Permission: {os.access(POPPLER_PATH, os.W_OK)}")
print(f"EXEC Permission: {os.access(POPPLER_PATH, os.X_OK)}")
metadata = data_obj['metadata']
page_num = int(data_obj["page"]) if "page" in data_obj else 1
metadata['page'] = str(page_num)
# Get source-ref if specified
source_ref = data_obj["source-ref"] if "source-ref" in data_obj else None
metadata["source_ref"] = source_ref
# document_id will consist of '{filename}_{page #}'
doc_filename = os.path.splitext(os.path.basename(source_ref))[0]
metadata["document_id"] = f"{doc_filename}_{metadata['page']}" if source_ref else None
use_textract_only = metadata["use-textract-only"] if "use-textract-only" in metadata else False
metadata["use-textract-only"] = "true" if use_textract_only else "false"
# create low-level S3 client
s3 = boto3.client('s3')
pdf_s3_resp = get_s3_object(s3, source_ref)
pdf_bytes = pdf_s3_resp['Body'].read()
print(f"pdf_bytes length = {len(pdf_bytes)}")
pdf_page_base64 = base64.b64encode(pdf_bytes)
do_ocr = True
# Decide whether to extract blocks from input jobs' blocks or from PDF file
primary_annotation_ref = data_obj.get("primary-annotation-ref")
if primary_annotation_ref:
do_ocr = False
primary_annotation_s3_resp = get_s3_object(s3, primary_annotation_ref)
# use pdf blocks from most recently modified annotation file
secondary_annotation_ref = data_obj.get("secondary-annotation-ref")
if secondary_annotation_ref:
secondary_annotation_s3_resp = get_s3_object(s3, secondary_annotation_ref)
primary_annotation_date = primary_annotation_s3_resp["LastModified"]
secondary_annotation_date = secondary_annotation_s3_resp["LastModified"]
if primary_annotation_date >= secondary_annotation_date:
annotation_bytes = primary_annotation_s3_resp["Body"].read()
else:
annotation_bytes = secondary_annotation_s3_resp["Body"].read()
# set most recent annotation file as primary annotation reference
data_obj['primary-annotation-ref'], data_obj['secondary-annotation-ref'] = data_obj['secondary-annotation-ref'], data_obj['primary-annotation-ref']
else:
annotation_bytes = primary_annotation_s3_resp["Body"].read()
annotation_obj = json.loads(annotation_bytes.decode('utf-8'))
if "Blocks" in annotation_obj:
pdf_blocks = annotation_obj["Blocks"]
is_native_pdf = annotation_obj.get("DocumentType", PDFType.NativePDF.name) == PDFType.NativePDF.name
else:
print('Remove annotation references as no extracted blocks are found in the latest annotation file.')
data_obj.pop("primary-annotation-ref", None)
data_obj.pop("secondary-annotation-ref", None)
do_ocr = True
if do_ocr:
print(f'Attempting OCR with use-textract-only: {use_textract_only}')
pdf_blocks, is_native_pdf = get_pdf_blocks(pdf_bytes, page_num, use_textract_only)
# create intermediate files and write to S3
pdf_base64_s3_ref = output_pdf_temp_file_to_s3(s3, source_ref, pdf_page_base64, page_num, job_id)
pdf_block_s3_ref = output_pdf_temp_file_to_s3(s3, source_ref, pdf_blocks, page_num, job_id)
task_object = {
"pdfBase64S3Ref": pdf_base64_s3_ref,
"pdfBlocksS3Ref": pdf_block_s3_ref,
"pdfType": PDFType.NativePDF.name if is_native_pdf else PDFType.ScannedPDF.name,
"version": VERSION,
"metadata": metadata,
"annotatorMetadata": data_obj["annotator-metadata"] if "annotator-metadata" in data_obj else None,
"primaryAnnotationS3Ref": data_obj.get("primary-annotation-ref"),
"secondaryAnnotationS3Ref": data_obj.get("secondary-annotation-ref")
}
print(task_object)
# Build response object
output = {
"taskInput": {
"taskObject": task_object,
"labels": metadata["labels"]
},
"humanAnnotationRequired": "true"
}
# If neither source nor source-ref specified, mark the annotation failed
if source_ref is None:
print(" Failed to pre-process {} !".format(event["labelingJobArn"]))
output["humanAnnotationRequired"] = "false"
return output