def handle_request()

in pipeline/ocr/fn-call-textract/main.py [0:0]


def handle_request(event, context):
    try:
        srcbucket = event["Input"]["Bucket"]
        srckey = event["Input"]["Key"]
        task_token = event["TaskToken"]
        textract_features = event["Output"].get("Features", default_textract_features)
        output_type = event["Output"].get("Type", "s3")
        output_type_lower = output_type.lower()
        if output_type_lower == "inline":
            destbucket = None
            destkey = None
            pass  # No other config to collect
        elif output_type_lower == "s3":
            destbucket = event["Output"].get("Bucket", srcbucket)
            if event["Output"].get("Key"):
                destkey = event["Output"]["Key"]
            else:
                prefix = event["Output"].get("Prefix", "")
                if prefix and not prefix.endswith("/"):
                    prefix += "/"
                destkey = "".join([prefix, srckey, ".textract.json"])
        else:
            raise MalformedRequest(
                f"Unknown output integration type '{output_type}': Expected 'Inline' or 'S3'"
            )
    except KeyError as ke:
        raise MalformedRequest(f"Missing field {ke}, please check your input payload")

    input_doc = {
        "S3Object": {
            "Bucket": srcbucket,
            "Name": srckey,
        },
    }
    if is_textract_sync:
        if len(textract_features) or force_analyze_apis:
            result = textract.analyze_document(
                Document=input_doc,
                FeatureTypes=textract_features,
            )
        else:
            result = textract.detect_document_text(
                Document=input_doc,
            )

        return send_result(
            textract_result=result,
            sfn_task_token=task_token,
            dest_bucket=destbucket,
            dest_key=destkey,
        )
    else:
        # ClientRequestToken allows idempotency in case of retries - but only up to 64 chars (so
        # we can't use the TaskToken). Different use cases might have different idempotency needs,
        # but remember that Textract won't re-trigger an SNS notification for a job it remembers.
        # By default we'll enforce idempotency by source location + target location (or current
        # timestamp if target location not provided), and provide an additional `IdempotencySalt`
        # escape hatch in case users need to force re-processing:
        urihash = hashlib.sha256()
        urihash.update(f"s3://{srcbucket}/{srckey}".encode("utf-8"))
        if destbucket:
            urihash.update(f"s3://{destbucket}/{destkey}".encode("utf-8"))
            if "IdempotencySalt" in event:
                urihash.update(event["IdempotencySalt"].encode("utf-8"))
        else:
            if "IdempotencySalt" in event:
                urihash.update(event["IdempotencySalt"].encode("utf-8"))
            else:
                urihash.update(f"{time.time()}".encode("utf-8"))
        # TODO: Any way we could catch an idempotency issue and pro-actively return result?
        start_params = {
            "DocumentLocation": input_doc,
            "ClientRequestToken": urihash.hexdigest(),
            # "JobTag": "Howdy",
            "NotificationChannel": {
                "RoleArn": callback_sns_role_arn,
                "SNSTopicArn": callback_sns_topic_arn,
            },
        }

        logger.info("Textract params: %s (features = %s)", start_params, textract_features)
        if len(textract_features) or force_analyze_apis:
            job = textract.start_document_analysis(
                FeatureTypes=textract_features,
                **start_params,
            )
        else:
            job = textract.start_document_text_detection(**start_params)
        logger.info(
            "Started async job %s for s3://%s/%s\n%s",
            job.get("JobId"),
            srcbucket,
            srckey,
            job,
        )
        cache_data = {
            "TextractJobId": job["JobId"],
            "SFnTaskToken": task_token,
            "OutputType": output_type_lower,
        }
        if STATE_CACHE_TTL_SECS:
            cache_data["ExpiresAt"] = int(time.time()) + STATE_CACHE_TTL_SECS
        if output_type_lower == "s3":
            cache_data["OutputS3Bucket"] = destbucket
            cache_data["OutputS3Key"] = destkey
        ddb_state_cache_table.put_item(
            Item=cache_data,
            ReturnValues="NONE",
        )
        return cache_data