def runComprehend()

in code/comprehend_sync/comprehend_processor.py [0:0]


def runComprehend(bucketName, objectName, callerId):
    
    comprehend = AwsHelper().getClient('comprehend')
    documentId, documentName = dissectObjectName(objectName)
    assert (documentId == S3Helper().getTagsS3(bucketName, objectName).get('documentId', None)), "File path {} does not match the expected documentId tag of the object triggered.".format(objectName)
    
    textractOutputJson = json.loads(S3Helper().readFromS3(bucketName, objectName))
    og = OutputGenerator(response=textractOutputJson, forms=False, tables=False)
    
    pipeline_client.body = {
        "documentId": documentId,
        "bucketName": bucketName,
        "objectName": objectName,
        "stage":      PIPELINE_STAGE
    }
    pipeline_client.stageInProgress()    
    
    document = Document(textractOutputJson)
    originalFileName = "{}/{}".format(documentId, documentName)
    comprehendFileName = originalFileName + "/comprehend-output.json"
    comprehendFileS3Url = "https://{}.s3.amazonaws.com/{}".format(comprehendBucket, urllib.parse.quote_plus(comprehendFileName, safe="/"))
    tagging = "documentId={}".format(documentId)
    
    es.connect()
    esPayload = []
    page_num = 1
    for page in document.pages:
        table = og.structurePageTable(page)
        forms = og.structurePageForm(page)
        text = og.structurePageText(page)

        keyPhrases = []
        entitiesDetected = {}
        
        lenOfEncodedText = len(text)
        print("Comprehend documentId {} processing page {}".format(documentId, str(page_num)))
        print("Length of encoded text is " + str(lenOfEncodedText))
        if lenOfEncodedText == 0:
            pass
        elif lenOfEncodedText > COMPREHEND_CHARACTER_LIMIT:
            print("Size was too big to run singularly; breaking up the page text into chunks")
            try:
                chunksOfText = chunkUpTheText(text)
            except Exception as e:
                pipeline_client.stageFailed("Could not determine how to snip the text on page {} into chunks.".format(page_num))
                raise(e)
            keyPhrases, entitiesDetected = batchSendToComprehend(comprehend, chunksOfText, 'en')
        else:
            keyPhrases, entitiesDetected = singularSendToComprehend(comprehend, text, 'en')
            
        esPageLoad = compileESPayload(es, page_num, keyPhrases, entitiesDetected, text, table, forms, documentId)
        esPayload.append(esPageLoad)
        page_num = page_num + 1
    
    try:
        es.post_bulk(index=esIndex, payload=esPayload)
    except Exception as e:
        pipeline_client.stageFailed("Could not post to Elasticsearch")
        raise(e)
    
    print("Data uploaded to ES")
    try:
        S3Helper().writeToS3(json.dumps(esPayload), comprehendBucket, comprehendFileName, taggingStr=tagging)
    except Exception as e:
        pipeline_client.stageFailed("Failed to write comprehend payload to S3")
        raise(e)
        
    lineage_client.recordLineage({
        "documentId":       documentId,
        "callerId":         callerId,
        "sourceBucketName": bucketName,
        "targetBucketName": comprehendBucket,
        "sourceFileName":   objectName,
        "targetFileName":   comprehendFileName
    })
    pipeline_client.stageSucceeded()
    print("Comprehend data uploaded to S3 at {}".format(comprehendFileName))