def processComprehend()

in source/lambda/helper/python/comprehendHelper.py [0:0]


    def processComprehend(self,
                          bucket,
                          textractResponseLocation,
                          comprehendOutputPath,
                          maxPages=200):


        # get textract results from S3
        textractFile = S3Helper.readFromS3(
            bucket, textractResponseLocation)
        textract = json.loads(textractFile)

        # total number of textracted pages
        numOfPages = self.getNumOfPages(textract)

        # error
        if numOfPages <= 0:
            return False

        # enforce a maximum of pages to be processed
        if numOfPages > maxPages:
            numOfPages = maxPages

        # iterate over results page by page and extract raw text for comprehend
        # initialize rawPages with atleast a 1 character string helps prevent errors produced by comprehend and comprehend medical
        # comprehend and comprehend medical need text with atleast 1 character and infer_icd10_cm() needs a non empty string
        rawPages = ["."] * numOfPages
        if self.extractTextByPages(textract, rawPages, numOfPages) == False:
            return False

        # process pages by batches of 25 max, determine how many batches we need
        numOfBatches = int(numOfPages / PAGES_PER_BATCH)
        if numOfPages % PAGES_PER_BATCH != 0:
            numOfBatches += 1

        # to store comprehend and medical API calls results.
        comprehendEntities = [None] * numOfPages
        comprehendMedicalEntities = [None] * numOfPages
        comprehendMedicalICD10 = [None] * numOfPages

        pagesProcessed = 0

        # process pages by batch
        for batch in range(0, numOfBatches):

            pageStartIndex = batch * PAGES_PER_BATCH
            pagesToProcess = numOfPages - pagesProcessed

            if pagesToProcess > PAGES_PER_BATCH:
                pagesToProcess = PAGES_PER_BATCH

            # keep track of all threads we spawn
            threads = list()

            # Comprehend call that can batch up to 25 pages together synchronously
            x = threading.Thread(target=self.batchComprehendDetectEntitiesSync,
                                 args=(rawPages, pagesToProcess, pageStartIndex, comprehendEntities))
            x.start()
            threads.append(x)

            # comprehendMedicalEntities is shared among threads
            medicalEntitiesMutex = threading.Lock()

            # ComprehendMedical
            for index in range(0, pagesToProcess):

                # Comprehend Medical can only handle one page at a time synchronously. The SDK handles
                # throttling by the service.
                x = threading.Thread(target=self.comprehendMedicalDetectEntitiesSync,
                                     args=(rawPages,
                                           pageStartIndex + index,
                                           comprehendMedicalEntities,
                                           medicalEntitiesMutex))
                x.start()
                threads.append(x)

            # comprehendMedicalEntities is shared among threads
            medicalICD10Mutex = threading.Lock()

            # ComprehendMedical
            for index in range(0, pagesToProcess):

                # Comprehend Medical can only handle one page at a time synchronously. The SDK handles
                # throttling by the service.
                x = threading.Thread(target=self.comprehendMedicalDetectICD10Sync,
                                     args=(rawPages,
                                           pageStartIndex + index,
                                           comprehendMedicalICD10,
                                           medicalICD10Mutex))
                x.start()
                threads.append(x)

            # wait on all threads to finish their work
            for index, thread in enumerate(threads):
                thread.join()

            print("All threads joined...")

            # check success of threads
            for i in range(pageStartIndex, pagesToProcess):
                if (comprehendEntities[pageStartIndex + i] == None) or (comprehendMedicalEntities[pageStartIndex + i] == None):
                    print("Page failed to process" + str(i))
                    return False

            # increment the number of pages processed for the next batch
            pagesProcessed += pagesToProcess

        # process comprehend data, create the entities result file in S3
        processedComprehendData = self.processAndReturnComprehendEntities(comprehendEntities,
                                       numOfPages,
                                       bucket,
                                       comprehendOutputPath)
                                  
        # process comprehend medical data, create the entities result file in S3
        comprehendMedicalEntities = self.processAndReturnComprehendMedicalEntities(comprehendMedicalEntities,
                                              numOfPages,
                                              bucket,
                                              comprehendOutputPath)
        # final list of comprehend and comprehend medical entities to be indexed
        processedComprehendData.update(comprehendMedicalEntities)

        # process comprehend medical data, create the ICD10 result file in S3
        self.processComprehendMedicalICD10(comprehendMedicalICD10,
                                           numOfPages,
                                           bucket,
                                           comprehendOutputPath)

        return processedComprehendData