in source/lambda/helper/python/comprehendHelper.py [0:0]
def processComprehend(self,
bucket,
textractResponseLocation,
comprehendOutputPath,
maxPages=200):
# get textract results from S3
textractFile = S3Helper.readFromS3(
bucket, textractResponseLocation)
textract = json.loads(textractFile)
# total number of textracted pages
numOfPages = self.getNumOfPages(textract)
# error
if numOfPages <= 0:
return False
# enforce a maximum of pages to be processed
if numOfPages > maxPages:
numOfPages = maxPages
# iterate over results page by page and extract raw text for comprehend
# initialize rawPages with atleast a 1 character string helps prevent errors produced by comprehend and comprehend medical
# comprehend and comprehend medical need text with atleast 1 character and infer_icd10_cm() needs a non empty string
rawPages = ["."] * numOfPages
if self.extractTextByPages(textract, rawPages, numOfPages) == False:
return False
# process pages by batches of 25 max, determine how many batches we need
numOfBatches = int(numOfPages / PAGES_PER_BATCH)
if numOfPages % PAGES_PER_BATCH != 0:
numOfBatches += 1
# to store comprehend and medical API calls results.
comprehendEntities = [None] * numOfPages
comprehendMedicalEntities = [None] * numOfPages
comprehendMedicalICD10 = [None] * numOfPages
pagesProcessed = 0
# process pages by batch
for batch in range(0, numOfBatches):
pageStartIndex = batch * PAGES_PER_BATCH
pagesToProcess = numOfPages - pagesProcessed
if pagesToProcess > PAGES_PER_BATCH:
pagesToProcess = PAGES_PER_BATCH
# keep track of all threads we spawn
threads = list()
# Comprehend call that can batch up to 25 pages together synchronously
x = threading.Thread(target=self.batchComprehendDetectEntitiesSync,
args=(rawPages, pagesToProcess, pageStartIndex, comprehendEntities))
x.start()
threads.append(x)
# comprehendMedicalEntities is shared among threads
medicalEntitiesMutex = threading.Lock()
# ComprehendMedical
for index in range(0, pagesToProcess):
# Comprehend Medical can only handle one page at a time synchronously. The SDK handles
# throttling by the service.
x = threading.Thread(target=self.comprehendMedicalDetectEntitiesSync,
args=(rawPages,
pageStartIndex + index,
comprehendMedicalEntities,
medicalEntitiesMutex))
x.start()
threads.append(x)
# comprehendMedicalEntities is shared among threads
medicalICD10Mutex = threading.Lock()
# ComprehendMedical
for index in range(0, pagesToProcess):
# Comprehend Medical can only handle one page at a time synchronously. The SDK handles
# throttling by the service.
x = threading.Thread(target=self.comprehendMedicalDetectICD10Sync,
args=(rawPages,
pageStartIndex + index,
comprehendMedicalICD10,
medicalICD10Mutex))
x.start()
threads.append(x)
# wait on all threads to finish their work
for index, thread in enumerate(threads):
thread.join()
print("All threads joined...")
# check success of threads
for i in range(pageStartIndex, pagesToProcess):
if (comprehendEntities[pageStartIndex + i] == None) or (comprehendMedicalEntities[pageStartIndex + i] == None):
print("Page failed to process" + str(i))
return False
# increment the number of pages processed for the next batch
pagesProcessed += pagesToProcess
# process comprehend data, create the entities result file in S3
processedComprehendData = self.processAndReturnComprehendEntities(comprehendEntities,
numOfPages,
bucket,
comprehendOutputPath)
# process comprehend medical data, create the entities result file in S3
comprehendMedicalEntities = self.processAndReturnComprehendMedicalEntities(comprehendMedicalEntities,
numOfPages,
bucket,
comprehendOutputPath)
# final list of comprehend and comprehend medical entities to be indexed
processedComprehendData.update(comprehendMedicalEntities)
# process comprehend medical data, create the ICD10 result file in S3
self.processComprehendMedicalICD10(comprehendMedicalICD10,
numOfPages,
bucket,
comprehendOutputPath)
return processedComprehendData