in flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java [355:477]
default void doAction(CallContext context, Action action, StreamListener<Result> listener) {
final String actionType = action.getType();
if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_BEGIN_SAVEPOINT.getType())) {
final ActionBeginSavepointRequest request =
FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), ActionBeginSavepointRequest.class);
beginSavepoint(request, context, new ProtoListener<>(listener));
} else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_BEGIN_TRANSACTION.getType())) {
final ActionBeginTransactionRequest request =
FlightSqlUtils.unpackAndParseOrThrow(
action.getBody(), ActionBeginTransactionRequest.class);
beginTransaction(request, context, new ProtoListener<>(listener));
} else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_CANCEL_QUERY.getType())) {
//noinspection deprecation
final ActionCancelQueryRequest request =
FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), ActionCancelQueryRequest.class);
final FlightInfo info;
try {
info = FlightInfo.deserialize(request.getInfo().asReadOnlyByteBuffer());
} catch (IOException | URISyntaxException e) {
listener.onError(
CallStatus.INTERNAL
.withDescription("Could not unpack FlightInfo: " + e)
.withCause(e)
.toRuntimeException());
return;
}
cancelQuery(info, context, new CancelListener(listener));
} else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_CREATE_PREPARED_STATEMENT.getType())) {
final ActionCreatePreparedStatementRequest request =
FlightSqlUtils.unpackAndParseOrThrow(
action.getBody(), ActionCreatePreparedStatementRequest.class);
createPreparedStatement(request, context, listener);
} else if (actionType.equals(
FlightSqlUtils.FLIGHT_SQL_CREATE_PREPARED_SUBSTRAIT_PLAN.getType())) {
final ActionCreatePreparedSubstraitPlanRequest request =
FlightSqlUtils.unpackAndParseOrThrow(
action.getBody(), ActionCreatePreparedSubstraitPlanRequest.class);
createPreparedSubstraitPlan(request, context, new ProtoListener<>(listener));
} else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_CLOSE_PREPARED_STATEMENT.getType())) {
final ActionClosePreparedStatementRequest request =
FlightSqlUtils.unpackAndParseOrThrow(
action.getBody(), ActionClosePreparedStatementRequest.class);
closePreparedStatement(request, context, new NoResultListener(listener));
} else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_END_SAVEPOINT.getType())) {
ActionEndSavepointRequest request =
FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), ActionEndSavepointRequest.class);
endSavepoint(request, context, new NoResultListener(listener));
} else if (actionType.equals(FlightSqlUtils.FLIGHT_SQL_END_TRANSACTION.getType())) {
ActionEndTransactionRequest request =
FlightSqlUtils.unpackAndParseOrThrow(action.getBody(), ActionEndTransactionRequest.class);
endTransaction(request, context, new NoResultListener(listener));
} else if (actionType.equals(FlightConstants.CANCEL_FLIGHT_INFO.getType())) {
final CancelFlightInfoRequest request;
try {
request = CancelFlightInfoRequest.deserialize(ByteBuffer.wrap(action.getBody()));
} catch (IOException | URISyntaxException e) {
listener.onError(
CallStatus.INTERNAL
.withDescription("Could not unpack FlightInfo: " + e)
.withCause(e)
.toRuntimeException());
return;
}
cancelFlightInfo(request, context, new CancelStatusListener(listener));
} else if (actionType.equals(FlightConstants.RENEW_FLIGHT_ENDPOINT.getType())) {
final RenewFlightEndpointRequest request;
try {
request = RenewFlightEndpointRequest.deserialize(ByteBuffer.wrap(action.getBody()));
} catch (IOException | URISyntaxException e) {
listener.onError(
CallStatus.INTERNAL
.withDescription("Could not unpack FlightInfo: " + e)
.withCause(e)
.toRuntimeException());
return;
}
renewFlightEndpoint(request, context, new FlightEndpointListener(listener));
} else if (actionType.equals(FlightConstants.SET_SESSION_OPTIONS.getType())) {
final SetSessionOptionsRequest request;
try {
request = SetSessionOptionsRequest.deserialize(ByteBuffer.wrap(action.getBody()));
} catch (IOException e) {
listener.onError(
CallStatus.INTERNAL
.withDescription("Could not unpack SetSessionOptionsRequest: " + e)
.withCause(e)
.toRuntimeException());
return;
}
setSessionOptions(request, context, new SetSessionOptionsResultListener(listener));
} else if (actionType.equals(FlightConstants.GET_SESSION_OPTIONS.getType())) {
final GetSessionOptionsRequest request;
try {
request = GetSessionOptionsRequest.deserialize(ByteBuffer.wrap(action.getBody()));
} catch (IOException e) {
listener.onError(
CallStatus.INTERNAL
.withDescription("Could not unpack GetSessionOptionsRequest: " + e)
.withCause(e)
.toRuntimeException());
return;
}
getSessionOptions(request, context, new GetSessionOptionsResultListener(listener));
} else if (actionType.equals(FlightConstants.CLOSE_SESSION.getType())) {
final CloseSessionRequest request;
try {
request = CloseSessionRequest.deserialize(ByteBuffer.wrap(action.getBody()));
} catch (IOException e) {
listener.onError(
CallStatus.INTERNAL
.withDescription("Could not unpack CloseSessionRequest: " + e)
.withCause(e)
.toRuntimeException());
return;
}
closeSession(request, context, new CloseSessionResultListener(listener));
} else {
throw CallStatus.INVALID_ARGUMENT
.withDescription("Unrecognized request: " + action.getType())
.toRuntimeException();
}
}