in source/lambda_handlers/02-ComprehendA2I.py [0:0]
def lambda_handler(event, context):
# Create an SSM Client
ssm_client = boto3.client('ssm')
# Create an A2I Client
a2i_client = boto3.client('sagemaker-a2i-runtime')
# Get parameters from SSM
comprehend_parameters = ssm_client.get_parameters(Names=['FlowDefARN-TCA2I',
'S3BucketName-TCA2I'], WithDecryption=True)
for parameter in comprehend_parameters['Parameters']:
if parameter['Name'] == 'FlowDefARN-TCA2I':
hrw_arn = parameter['Value']
elif parameter['Name'] == 'S3BucketName-TCA2I':
primary_s3_bucket = parameter['Value']
# Create an S3 Client
s3_client = boto3.client('s3')
# Get details of the object that was just created by Comprehend
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
# Get the unique name for this file
extracted_file_key = 'comprehend-output/raw/' + key.split("/")[2] + "-" + "results"
# Load the .tar.gz file that generated from Comprehend Custom Entity Recognition
input_tar_file = s3_client.get_object(Bucket=bucket, Key=key)
input_tar_content = input_tar_file['Body'].read()
# Extract the file and save the contents in the primary bucket
with tarfile.open(fileobj=BytesIO(input_tar_content)) as tar:
for tar_resource in tar:
if (tar_resource.isfile()):
inner_file_bytes = tar.extractfile(tar_resource).read()
s3_client.upload_fileobj(BytesIO(inner_file_bytes), Bucket=primary_s3_bucket, Key=extracted_file_key)
# Load the results generated by comprehend from the Primary Data Source Bucket
custom_entities_file = s3_client.get_object(Bucket=primary_s3_bucket, Key=extracted_file_key)
custom_entities_recognition_results = json.loads(custom_entities_file['Body'].read())
# Load the original text extracted using Amazon Textract
file_identifier = custom_entities_recognition_results['File']
textract_results_key = 'textract-output/processed/' + file_identifier
text_file_object = s3_client.get_object(Bucket=primary_s3_bucket, Key=textract_results_key)
original_text_file = text_file_object['Body'].read().decode("utf-8", 'ignore')
# Initialize Human Loop Input Object
human_loop_input = {}
human_loop_input['originalText'] = original_text_file
# Add list of identified entities
human_loop_input['entities'] = custom_entities_recognition_results['Entities']
# Add a list of types of entities that we need to recognize
human_loop_input['labels'] = [{'label': 'device', 'shortDisplayName': 'dvc', 'fullDisplayName': 'Device'}]
# Create an attribute to mark the entities that have been
# already identified to save time for the Human Reviewers
existing_entities = []
for entity in human_loop_input['entities']:
current_entity = {}
current_entity['label'] = entity['Type'].lower()
current_entity['startOffset'] = entity['BeginOffset']
current_entity['endOffset'] = entity['EndOffset']
existing_entities.append(current_entity)
human_loop_input['initialValue'] = existing_entities
# Create a Human Loop Name
human_loop_name = 'TCA2I-' + str(int(round(time.time() * 1000)))
print('Starting human loop - ' + human_loop_name)
response = a2i_client.start_human_loop(
HumanLoopName=human_loop_name,
FlowDefinitionArn=hrw_arn,
HumanLoopInput={
'InputContent': json.dumps(human_loop_input)
}
)
return 0