in code/comprehend_sync/comprehend_processor.py [0:0]
def runComprehend(bucketName, objectName, callerId):
comprehend = AwsHelper().getClient('comprehend')
documentId, documentName = dissectObjectName(objectName)
assert (documentId == S3Helper().getTagsS3(bucketName, objectName).get('documentId', None)), "File path {} does not match the expected documentId tag of the object triggered.".format(objectName)
textractOutputJson = json.loads(S3Helper().readFromS3(bucketName, objectName))
og = OutputGenerator(response=textractOutputJson, forms=False, tables=False)
pipeline_client.body = {
"documentId": documentId,
"bucketName": bucketName,
"objectName": objectName,
"stage": PIPELINE_STAGE
}
pipeline_client.stageInProgress()
document = Document(textractOutputJson)
originalFileName = "{}/{}".format(documentId, documentName)
comprehendFileName = originalFileName + "/comprehend-output.json"
comprehendFileS3Url = "https://{}.s3.amazonaws.com/{}".format(comprehendBucket, urllib.parse.quote_plus(comprehendFileName, safe="/"))
tagging = "documentId={}".format(documentId)
es.connect()
esPayload = []
page_num = 1
for page in document.pages:
table = og.structurePageTable(page)
forms = og.structurePageForm(page)
text = og.structurePageText(page)
keyPhrases = []
entitiesDetected = {}
lenOfEncodedText = len(text)
print("Comprehend documentId {} processing page {}".format(documentId, str(page_num)))
print("Length of encoded text is " + str(lenOfEncodedText))
if lenOfEncodedText == 0:
pass
elif lenOfEncodedText > COMPREHEND_CHARACTER_LIMIT:
print("Size was too big to run singularly; breaking up the page text into chunks")
try:
chunksOfText = chunkUpTheText(text)
except Exception as e:
pipeline_client.stageFailed("Could not determine how to snip the text on page {} into chunks.".format(page_num))
raise(e)
keyPhrases, entitiesDetected = batchSendToComprehend(comprehend, chunksOfText, 'en')
else:
keyPhrases, entitiesDetected = singularSendToComprehend(comprehend, text, 'en')
esPageLoad = compileESPayload(es, page_num, keyPhrases, entitiesDetected, text, table, forms, documentId)
esPayload.append(esPageLoad)
page_num = page_num + 1
try:
es.post_bulk(index=esIndex, payload=esPayload)
except Exception as e:
pipeline_client.stageFailed("Could not post to Elasticsearch")
raise(e)
print("Data uploaded to ES")
try:
S3Helper().writeToS3(json.dumps(esPayload), comprehendBucket, comprehendFileName, taggingStr=tagging)
except Exception as e:
pipeline_client.stageFailed("Failed to write comprehend payload to S3")
raise(e)
lineage_client.recordLineage({
"documentId": documentId,
"callerId": callerId,
"sourceBucketName": bucketName,
"targetBucketName": comprehendBucket,
"sourceFileName": objectName,
"targetFileName": comprehendFileName
})
pipeline_client.stageSucceeded()
print("Comprehend data uploaded to S3 at {}".format(comprehendFileName))