def rpcapi_v2()

in providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_routes.py [0:0]


def rpcapi_v2(body: dict[str, Any]) -> APIResponse:
    """Handle Edge Worker API `/edge_worker/v1/rpcapi` endpoint for Airflow 2.10."""
    # Note: Except the method map this _was_ a 100% copy of internal API module
    #       airflow.api_internal.endpoints.rpc_api_endpoint.internal_airflow_api()
    # As of rework for FastAPI in Airflow 3.0, this is updated and to be removed in the future.
    from airflow.api_internal.endpoints.rpc_api_endpoint import (  # type: ignore[attr-defined]
        # Note: This is just for compatibility with Airflow 2.10, not working for Airflow 3 / main as removed
        initialize_method_map,
    )

    try:
        if request.headers.get("Content-Type", "") != "application/json":
            raise HTTPException(status.HTTP_403_FORBIDDEN, "Expected Content-Type: application/json")
        if request.headers.get("Accept", "") != "application/json":
            raise HTTPException(status.HTTP_403_FORBIDDEN, "Expected Accept: application/json")
        auth = request.headers.get("Authorization", "")
        request_obj = JsonRpcRequest(method=body["method"], jsonrpc=body["jsonrpc"], params=body["params"])
        jwt_token_authorization_rpc(request_obj, auth)
        if request_obj.jsonrpc != "2.0":
            raise error_response("Expected jsonrpc 2.0 request.", status.HTTP_400_BAD_REQUEST)

        log.debug("Got request for %s", request_obj.method)
        methods_map = initialize_method_map()
        if request_obj.method not in methods_map:
            raise error_response(f"Unrecognized method: {request_obj.method}.", status.HTTP_400_BAD_REQUEST)

        handler = methods_map[request_obj.method]
        params = {}
        try:
            if request_obj.params:
                # Note, this is Airflow 2.10 specific, as it uses Pydantic models for serialization
                params = BaseSerialization.deserialize(request_obj.params, use_pydantic_models=True)  # type: ignore[call-arg]
        except Exception:
            raise error_response("Error deserializing parameters.", status.HTTP_400_BAD_REQUEST)

        log.debug("Calling method %s\nparams: %s", request_obj.method, params)
        try:
            # Session must be created there as it may be needed by serializer for lazy-loaded fields.
            with create_session() as session:
                output = handler(**params, session=session)
                # Note, this is Airflow 2.10 specific, as it uses Pydantic models for serialization
                output_json = BaseSerialization.serialize(output, use_pydantic_models=True)  # type: ignore[call-arg]
                log.debug(
                    "Sending response: %s", json.dumps(output_json) if output_json is not None else None
                )
        # In case of AirflowException or other selective known types, transport the exception class back to caller
        except (KeyError, AttributeError, AirflowException) as e:
            # Note, this is Airflow 2.10 specific, as it uses Pydantic models for serialization
            output_json = BaseSerialization.serialize(e, use_pydantic_models=True)  # type: ignore[call-arg]
            log.debug(
                "Sending exception response: %s", json.dumps(output_json) if output_json is not None else None
            )
        except Exception:
            raise error_response(
                f"Error executing method '{request_obj.method}'.", status.HTTP_500_INTERNAL_SERVER_ERROR
            )
        response = json.dumps(output_json) if output_json is not None else None
        return Response(response=response, headers={"Content-Type": "application/json"})
    except HTTPException as e:
        return e.to_response()  # type: ignore[attr-defined]