default void doAction()

in flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java [355:477]


  default void doAction(CallContext context, Action action, StreamListener<Result> listener) {
    final String actionType = action.getType();

    if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_BEGIN_SAVEPOINT.getType())) {
      final ActionBeginSavepointRequest request =
          FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), ActionBeginSavepointRequest.class);
      beginSavepoint(request, context, new ProtoListener<>(listener));
    } else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_BEGIN_TRANSACTION.getType())) {
      final ActionBeginTransactionRequest request =
          FlightSqlUtils.unpackAndParseOrThrow(
              action.getBody(), ActionBeginTransactionRequest.class);
      beginTransaction(request, context, new ProtoListener<>(listener));
    } else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_CANCEL_QUERY.getType())) {
      //noinspection deprecation
      final ActionCancelQueryRequest request =
          FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), ActionCancelQueryRequest.class);
      final FlightInfo info;
      try {
        info = FlightInfo.deserialize(request.getInfo().asReadOnlyByteBuffer());
      } catch (IOException | URISyntaxException e) {
        listener.onError(
            CallStatus.INTERNAL
                .withDescription("Could not unpack FlightInfo: " + e)
                .withCause(e)
                .toRuntimeException());
        return;
      }
      cancelQuery(info, context, new CancelListener(listener));
    } else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_CREATE_PREPARED_STATEMENT.getType())) {
      final ActionCreatePreparedStatementRequest request =
          FlightSqlUtils.unpackAndParseOrThrow(
              action.getBody(), ActionCreatePreparedStatementRequest.class);
      createPreparedStatement(request, context, listener);
    } else if (actionType.equals(
        FlightSqlUtils.FLIGHT_SQL_CREATE_PREPARED_SUBSTRAIT_PLAN.getType())) {
      final ActionCreatePreparedSubstraitPlanRequest request =
          FlightSqlUtils.unpackAndParseOrThrow(
              action.getBody(), ActionCreatePreparedSubstraitPlanRequest.class);
      createPreparedSubstraitPlan(request, context, new ProtoListener<>(listener));
    } else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_CLOSE_PREPARED_STATEMENT.getType())) {
      final ActionClosePreparedStatementRequest request =
          FlightSqlUtils.unpackAndParseOrThrow(
              action.getBody(), ActionClosePreparedStatementRequest.class);
      closePreparedStatement(request, context, new NoResultListener(listener));
    } else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_END_SAVEPOINT.getType())) {
      ActionEndSavepointRequest request =
          FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), ActionEndSavepointRequest.class);
      endSavepoint(request, context, new NoResultListener(listener));
    } else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_END_TRANSACTION.getType())) {
      ActionEndTransactionRequest request =
          FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), ActionEndTransactionRequest.class);
      endTransaction(request, context, new NoResultListener(listener));
    } else if (actionType.equals(FlightConstants.CANCEL_FLIGHT_INFO.getType())) {
      final CancelFlightInfoRequest request;
      try {
        request = CancelFlightInfoRequest.deserialize(ByteBuffer.wrap(action.getBody()));
      } catch (IOException | URISyntaxException e) {
        listener.onError(
            CallStatus.INTERNAL
                .withDescription("Could not unpack FlightInfo: " + e)
                .withCause(e)
                .toRuntimeException());
        return;
      }
      cancelFlightInfo(request, context, new CancelStatusListener(listener));
    } else if (actionType.equals(FlightConstants.RENEW_FLIGHT_ENDPOINT.getType())) {
      final RenewFlightEndpointRequest request;
      try {
        request = RenewFlightEndpointRequest.deserialize(ByteBuffer.wrap(action.getBody()));
      } catch (IOException | URISyntaxException e) {
        listener.onError(
            CallStatus.INTERNAL
                .withDescription("Could not unpack FlightInfo: " + e)
                .withCause(e)
                .toRuntimeException());
        return;
      }
      renewFlightEndpoint(request, context, new FlightEndpointListener(listener));
    } else if (actionType.equals(FlightConstants.SET_SESSION_OPTIONS.getType())) {
      final SetSessionOptionsRequest request;
      try {
        request = SetSessionOptionsRequest.deserialize(ByteBuffer.wrap(action.getBody()));
      } catch (IOException e) {
        listener.onError(
            CallStatus.INTERNAL
                .withDescription("Could not unpack SetSessionOptionsRequest: " + e)
                .withCause(e)
                .toRuntimeException());
        return;
      }
      setSessionOptions(request, context, new SetSessionOptionsResultListener(listener));
    } else if (actionType.equals(FlightConstants.GET_SESSION_OPTIONS.getType())) {
      final GetSessionOptionsRequest request;
      try {
        request = GetSessionOptionsRequest.deserialize(ByteBuffer.wrap(action.getBody()));
      } catch (IOException e) {
        listener.onError(
            CallStatus.INTERNAL
                .withDescription("Could not unpack GetSessionOptionsRequest: " + e)
                .withCause(e)
                .toRuntimeException());
        return;
      }
      getSessionOptions(request, context, new GetSessionOptionsResultListener(listener));
    } else if (actionType.equals(FlightConstants.CLOSE_SESSION.getType())) {
      final CloseSessionRequest request;
      try {
        request = CloseSessionRequest.deserialize(ByteBuffer.wrap(action.getBody()));
      } catch (IOException e) {
        listener.onError(
            CallStatus.INTERNAL
                .withDescription("Could not unpack CloseSessionRequest: " + e)
                .withCause(e)
                .toRuntimeException());
        return;
      }
      closeSession(request, context, new CloseSessionResultListener(listener));
    } else {
      throw CallStatus.INVALID_ARGUMENT
          .withDescription("Unrecognized request: " + action.getType())
          .toRuntimeException();
    }
  }