in caller/textractcaller/t_call.py [0:0]
def call_textract(input_document: Union[str, bytes],
features: List[Textract_Features] = None,
output_config: OutputConfig = None,
kms_key_id: str = None,
job_tag: str = None,
notification_channel: NotificationChannel = None,
client_request_token: str = None,
return_job_id: bool = False,
force_async_api: bool = False,
boto3_textract_client=None,
job_done_polling_interval=1) -> dict:
"""
calls Textract and returns a response (either full json as string (json.dumps)or the job_id when return_job_id=True)
In case of TIFF the default is calling sync, so if a multi-page TIFF is passed in the caller has to set force_async_api=True or will get a botocore.errorfactory.UnsupportedDocumentException
input_document: points to document on S3 when string starts with s3://
points to local file when string does not start with s3://
or bytearray when object is in memory
s3_output_url: s3 output location in the form of s3://<bucket>/<key>
return_job_id: return job_id instead of full json in case calling functions handles async process flow
force_async_api: when passing in an image default is to call sync API, this forces the async API to be called (input-document has to be on S3)
client_request_token: passed down to Textract API
job_tag: passed down to Textract API
boto_3_textract_client: pass in boto3 client (to overcome missing region in environmnent, e. g.)
job_done_polling_interval: when using async (pdf document of force_async_api, the implementation polls every x seconds (1 second by default))
returns: dict with either Textract response or async API response (incl. the JobId)
raises LimitExceededException when receiving LimitExceededException from Textract API. Expectation is to handle in calling function
"""
logger.debug("call_textract")
if not boto3_textract_client:
textract = boto3.client("textract")
else:
textract = boto3_textract_client
is_s3_document: bool = False
s3_bucket = ""
s3_key = ""
result_value = {}
if isinstance(input_document, str):
if len(input_document) > 7 and input_document.lower().startswith("s3://"):
is_s3_document = True
s3_bucket, s3_key = input_document.replace("s3://", "").split("/", 1)
ext: str = ""
_, ext = os.path.splitext(input_document)
ext = ext.lower()
is_pdf: bool = (ext != None and ext.lower() in only_async_suffixes)
if is_pdf and not is_s3_document:
raise ValueError("PDF only supported when located on S3")
if not is_s3_document and force_async_api:
raise ValueError("when forcing async, document has to be on s3")
if not is_s3_document and output_config:
raise ValueError("only can have s3_output_url for async processes with document location on s3")
if notification_channel and not return_job_id:
raise ValueError("when submitting notification_channel, has to also expect the job_id as result atm.")
# ASYNC
if is_pdf or force_async_api and is_s3_document:
logger.debug(f"is_pdf or force_async_api and is_s3_document")
params = generate_request_params(
document_location=DocumentLocation(s3_bucket=s3_bucket, s3_prefix=s3_key),
features=features,
output_config=output_config,
notification_channel=notification_channel,
kms_key_id=kms_key_id,
client_request_token=client_request_token,
job_tag=job_tag,
)
if features:
textract_api = Textract_API.ANALYZE
submission_status = textract.start_document_analysis(**params)
else:
textract_api = Textract_API.DETECT
submission_status = textract.start_document_text_detection(**params)
if submission_status["ResponseMetadata"]["HTTPStatusCode"] == 200:
if return_job_id:
return submission_status
else:
result_value = get_full_json(submission_status['JobId'],
textract_api=textract_api,
boto3_textract_client=textract,
job_done_polling_interval=job_done_polling_interval)
else:
raise Exception(f"Got non-200 response code: {submission_status}")
elif ext in sync_suffixes:
# s3 file
if is_s3_document:
params = generate_request_params(document=Document(s3_bucket=s3_bucket, s3_prefix=s3_key),
features=features,
output_config=output_config,
kms_key_id=kms_key_id,
notification_channel=notification_channel)
if features:
result_value = textract.analyze_document(**params)
else:
result_value = textract.detect_document_text(**params)
# local file
else:
with open(input_document, 'rb') as input_file:
doc_bytes: bytearray = bytearray(input_file.read())
params = generate_request_params(
document=Document(byte_data=doc_bytes),
features=features,
)
if features:
result_value = textract.analyze_document(**params)
else:
result_value = textract.detect_document_text(**params)
# got bytearray, calling sync API
elif isinstance(input_document, (bytes, bytearray)):
logger.debug("processing bytes or bytearray")
if force_async_api:
raise Exception("cannot run async for bytearray")
params = generate_request_params(
document=Document(byte_data=input_document),
features=features,
)
if features:
result_value = textract.analyze_document(**params)
else:
result_value = textract.detect_document_text(**params)
else:
raise ValueError(f"unsupported input_document type: {type(input_document)}")
return result_value