def lambda_handler()

in lambda/api/lambda_invoke.py [0:0]


def lambda_handler(event, context):
    try:
        logger.debug(json.dumps(event))

        # Get elements from API payload
        if event["httpMethod"] in ["POST", "PUT"] and "body" in event:
            body = json.loads(event["body"])
        else:
            raise Exception("Require HTTP POST with json body")

        endpoint_name = body.get("endpoint_name")
        if endpoint_name is None:
            raise Exception("Require endpoint name in body")

        # Optionally allow overriding the endpoint variant
        endpoint_variant = body.get("endpoint_variant")

        # If this is a POST/PUT to root, then we are creating a new endpoint
        path = event["path"]
        if path == "/stats":
            # Get stats for existing endpoint
            result, status_code = handle_stats(endpoint_name)
        else:
            # Get inference id and user id from request, or generate a new ones
            inference_id = body.get("inference_id", str(uuid.uuid4()))
            user_id = str(body.get("user_id", uuid.uuid4()))

            if endpoint_variant is None:
                try:
                    # Get the configuration for the endpoint name
                    strategy, user_variant, status_code = get_user_variant(
                        endpoint_name, user_id
                    )
                except Exception as e:
                    # Log warning and return fallback strategy
                    logger.warning("Unable to get user variant")
                    logger.warning(e)
                    strategy, user_variant, status_code = ("Fallback", None, 202)
            else:
                # Log the manual strategy for the endpoint variant
                logger.info(
                    f"Manual override endpoint: {endpoint_name} variant: {endpoint_variant}"
                )
                strategy, user_variant, status_code = ("Manual", endpoint_variant, 202)

            # Get request identity that is non null (eg sourcIP, useragent)
            request_identity = {
                "source_ip": event["requestContext"]["identity"]["sourceIp"],
                "user_agent": event["requestContext"]["identity"]["userAgent"],
            }

            # Based on path handle invocation
            if path == "/invocation":
                content_type = body.get("content_type", "application/json")
                data = body["data"]
                result = handle_invocation(
                    strategy=strategy,
                    endpoint_name=endpoint_name,
                    content_type=content_type,
                    inference_id=inference_id,
                    user_id=user_id,
                    target_variant=user_variant,
                    data=data,
                )
                log_metric("invocation", result, request_identity)
            elif path == "/conversion":
                # Get default reward of "1" unless provided
                reward = float(body.get("reward", "1"))
                result = handle_conversion(
                    strategy=strategy,
                    endpoint_name=endpoint_name,
                    inference_id=inference_id,
                    user_id=user_id,
                    user_variant=user_variant,
                    reward=reward,
                )
                log_metric("conversion", result, request_identity)
            else:
                raise Exception(f"Invalid path: {path}")

        # Log result succesful result and return
        logger.debug(json.dumps(result))
        return {"statusCode": status_code, "body": json.dumps(result)}
    except ClientError as e:
        logger.error(e)
        # Get boto3 specific error message
        error_message = e.response["Error"]["Message"]
        raise Exception(error_message)
    except Exception as e:
        logger.error(e)
        raise e