in arrow/flight/flightsql/server.go [1077:1396]
func (f *flightSqlServer) DoAction(cmd *flight.Action, stream flight.FlightService_DoActionServer) error {
var anycmd anypb.Any
switch cmd.Type {
case flight.CancelFlightInfoActionType:
var (
request flight.CancelFlightInfoRequest
result flight.CancelFlightInfoResult
err error
)
if err = proto.Unmarshal(cmd.Body, &request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal CancelFlightInfoRequest for CancelFlightInfo: %s", err.Error())
}
result, err = f.srv.CancelFlightInfo(stream.Context(), &request)
if err != nil {
return err
}
out := &pb.Result{}
out.Body, err = proto.Marshal(&result)
if err != nil {
return err
}
return stream.Send(out)
case flight.RenewFlightEndpointActionType:
var (
request flight.RenewFlightEndpointRequest
err error
)
if err = proto.Unmarshal(cmd.Body, &request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal FlightEndpoint for RenewFlightEndpoint: %s", err.Error())
}
renewedEndpoint, err := f.srv.RenewFlightEndpoint(stream.Context(), &request)
if err != nil {
return err
}
out := &pb.Result{}
out.Body, err = proto.Marshal(renewedEndpoint)
if err != nil {
return err
}
return stream.Send(out)
case BeginSavepointActionType:
if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
}
var (
request pb.ActionBeginSavepointRequest
result pb.ActionBeginSavepointResult
id []byte
err error
)
if err = anycmd.UnmarshalTo(&request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
}
if id, err = f.srv.BeginSavepoint(stream.Context(), &request); err != nil {
return err
}
result.SavepointId = id
out, err := packActionResult(&result)
if err != nil {
return err
}
return stream.Send(out)
case BeginTransactionActionType:
if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
}
var (
request pb.ActionBeginTransactionRequest
result pb.ActionBeginTransactionResult
id []byte
err error
)
if err = anycmd.UnmarshalTo(&request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
}
if id, err = f.srv.BeginTransaction(stream.Context(), &request); err != nil {
return err
}
result.TransactionId = id
out, err := packActionResult(&result)
if err != nil {
return err
}
return stream.Send(out)
case CancelQueryActionType:
if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
}
var (
//nolint:staticcheck,SA1019 for backward compatibility
request pb.ActionCancelQueryRequest
//nolint:staticcheck,SA1019 for backward compatibility
result pb.ActionCancelQueryResult
info flight.FlightInfo
err error
)
if err = anycmd.UnmarshalTo(&request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
}
if err = proto.Unmarshal(request.Info, &info); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal FlightInfo for CancelQuery: %s", err)
}
if cancel, ok := f.srv.(cancelQueryServer); ok {
result.Result, err = cancel.CancelQuery(stream.Context(), &cancelQueryRequest{&info})
if err != nil {
return err
}
} else {
cancelFlightInfoRequest := flight.CancelFlightInfoRequest{Info: &info}
cancelFlightInfoResult, err := f.srv.CancelFlightInfo(stream.Context(), &cancelFlightInfoRequest)
if err != nil {
return err
}
result.Result = cancelStatusToCancelResult(cancelFlightInfoResult.Status)
}
out, err := packActionResult(&result)
if err != nil {
return err
}
return stream.Send(out)
case CreatePreparedStatementActionType:
if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
}
var (
request pb.ActionCreatePreparedStatementRequest
result pb.ActionCreatePreparedStatementResult
ret pb.Result
)
if err := anycmd.UnmarshalTo(&request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
}
output, err := f.srv.CreatePreparedStatement(stream.Context(), &request)
if err != nil {
return err
}
result.PreparedStatementHandle = output.Handle
if output.DatasetSchema != nil {
result.DatasetSchema = flight.SerializeSchema(output.DatasetSchema, f.mem)
}
if output.ParameterSchema != nil {
result.ParameterSchema = flight.SerializeSchema(output.ParameterSchema, f.mem)
}
if err := anycmd.MarshalFrom(&result); err != nil {
return status.Errorf(codes.Internal, "unable to marshal final response: %s", err.Error())
}
if ret.Body, err = proto.Marshal(&anycmd); err != nil {
return status.Errorf(codes.Internal, "unable to marshal result: %s", err.Error())
}
return stream.Send(&ret)
case CreatePreparedSubstraitPlanActionType:
if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
}
var (
request pb.ActionCreatePreparedSubstraitPlanRequest
result pb.ActionCreatePreparedStatementResult
ret pb.Result
)
if err := anycmd.UnmarshalTo(&request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
}
output, err := f.srv.CreatePreparedSubstraitPlan(stream.Context(), &createPreparedSubstraitPlanReq{&request})
if err != nil {
return err
}
result.PreparedStatementHandle = output.Handle
if output.DatasetSchema != nil {
result.DatasetSchema = flight.SerializeSchema(output.DatasetSchema, f.mem)
}
if output.ParameterSchema != nil {
result.ParameterSchema = flight.SerializeSchema(output.ParameterSchema, f.mem)
}
if err := anycmd.MarshalFrom(&result); err != nil {
return status.Errorf(codes.Internal, "unable to marshal final response: %s", err.Error())
}
if ret.Body, err = proto.Marshal(&anycmd); err != nil {
return status.Errorf(codes.Internal, "unable to marshal result: %s", err.Error())
}
return stream.Send(&ret)
case ClosePreparedStatementActionType:
if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
}
var request pb.ActionClosePreparedStatementRequest
if err := anycmd.UnmarshalTo(&request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
}
if err := f.srv.ClosePreparedStatement(stream.Context(), &request); err != nil {
return err
}
return stream.Send(&pb.Result{})
case EndTransactionActionType:
if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
}
var request pb.ActionEndTransactionRequest
if err := anycmd.UnmarshalTo(&request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
}
if err := f.srv.EndTransaction(stream.Context(), &request); err != nil {
return err
}
return stream.Send(&pb.Result{})
case EndSavepointActionType:
if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
}
var request pb.ActionEndSavepointRequest
if err := anycmd.UnmarshalTo(&request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
}
if err := f.srv.EndSavepoint(stream.Context(), &request); err != nil {
return err
}
return stream.Send(&pb.Result{})
case flight.SetSessionOptionsActionType:
var (
request flight.SetSessionOptionsRequest
err error
)
if err = proto.Unmarshal(cmd.Body, &request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal SetSessionOptionsRequest: %s", err.Error())
}
response, err := f.srv.SetSessionOptions(stream.Context(), &request)
if err != nil {
return err
}
out := &pb.Result{}
out.Body, err = proto.Marshal(response)
if err != nil {
return err
}
return stream.Send(out)
case flight.GetSessionOptionsActionType:
var (
request flight.GetSessionOptionsRequest
err error
)
if err = proto.Unmarshal(cmd.Body, &request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal GetSessionOptionsRequest: %s", err.Error())
}
response, err := f.srv.GetSessionOptions(stream.Context(), &request)
if err != nil {
return err
}
out := &pb.Result{}
out.Body, err = proto.Marshal(response)
if err != nil {
return err
}
return stream.Send(out)
case flight.CloseSessionActionType:
var (
request flight.CloseSessionRequest
err error
)
if err = proto.Unmarshal(cmd.Body, &request); err != nil {
return status.Errorf(codes.InvalidArgument, "unable to unmarshal CloseSessionRequest: %s", err.Error())
}
response, err := f.srv.CloseSession(stream.Context(), &request)
if err != nil {
return err
}
out := &pb.Result{}
out.Body, err = proto.Marshal(response)
if err != nil {
return err
}
return stream.Send(out)
default:
return status.Error(codes.InvalidArgument, "the defined request is invalid.")
}
}