in providers/edge3/src/airflow/providers/edge3/worker_api/routes/_v2_routes.py [0:0]
def rpcapi_v2(body: dict[str, Any]) -> APIResponse:
"""Handle Edge Worker API `/edge_worker/v1/rpcapi` endpoint for Airflow 2.10."""
# Note: Except the method map this _was_ a 100% copy of internal API module
# airflow.api_internal.endpoints.rpc_api_endpoint.internal_airflow_api()
# As of rework for FastAPI in Airflow 3.0, this is updated and to be removed in the future.
from airflow.api_internal.endpoints.rpc_api_endpoint import ( # type: ignore[attr-defined]
# Note: This is just for compatibility with Airflow 2.10, not working for Airflow 3 / main as removed
initialize_method_map,
)
try:
if request.headers.get("Content-Type", "") != "application/json":
raise HTTPException(status.HTTP_403_FORBIDDEN, "Expected Content-Type: application/json")
if request.headers.get("Accept", "") != "application/json":
raise HTTPException(status.HTTP_403_FORBIDDEN, "Expected Accept: application/json")
auth = request.headers.get("Authorization", "")
request_obj = JsonRpcRequest(method=body["method"], jsonrpc=body["jsonrpc"], params=body["params"])
jwt_token_authorization_rpc(request_obj, auth)
if request_obj.jsonrpc != "2.0":
raise error_response("Expected jsonrpc 2.0 request.", status.HTTP_400_BAD_REQUEST)
log.debug("Got request for %s", request_obj.method)
methods_map = initialize_method_map()
if request_obj.method not in methods_map:
raise error_response(f"Unrecognized method: {request_obj.method}.", status.HTTP_400_BAD_REQUEST)
handler = methods_map[request_obj.method]
params = {}
try:
if request_obj.params:
# Note, this is Airflow 2.10 specific, as it uses Pydantic models for serialization
params = BaseSerialization.deserialize(request_obj.params, use_pydantic_models=True) # type: ignore[call-arg]
except Exception:
raise error_response("Error deserializing parameters.", status.HTTP_400_BAD_REQUEST)
log.debug("Calling method %s\nparams: %s", request_obj.method, params)
try:
# Session must be created there as it may be needed by serializer for lazy-loaded fields.
with create_session() as session:
output = handler(**params, session=session)
# Note, this is Airflow 2.10 specific, as it uses Pydantic models for serialization
output_json = BaseSerialization.serialize(output, use_pydantic_models=True) # type: ignore[call-arg]
log.debug(
"Sending response: %s", json.dumps(output_json) if output_json is not None else None
)
# In case of AirflowException or other selective known types, transport the exception class back to caller
except (KeyError, AttributeError, AirflowException) as e:
# Note, this is Airflow 2.10 specific, as it uses Pydantic models for serialization
output_json = BaseSerialization.serialize(e, use_pydantic_models=True) # type: ignore[call-arg]
log.debug(
"Sending exception response: %s", json.dumps(output_json) if output_json is not None else None
)
except Exception:
raise error_response(
f"Error executing method '{request_obj.method}'.", status.HTTP_500_INTERNAL_SERVER_ERROR
)
response = json.dumps(output_json) if output_json is not None else None
return Response(response=response, headers={"Content-Type": "application/json"})
except HTTPException as e:
return e.to_response() # type: ignore[attr-defined]