func()

in arrow/flight/flightsql/server.go [1077:1396]


func (f *flightSqlServer) DoAction(cmd *flight.Action, stream flight.FlightService_DoActionServer) error {
	var anycmd anypb.Any

	switch cmd.Type {
	case flight.CancelFlightInfoActionType:
		var (
			request flight.CancelFlightInfoRequest
			result  flight.CancelFlightInfoResult
			err     error
		)

		if err = proto.Unmarshal(cmd.Body, &request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal CancelFlightInfoRequest for CancelFlightInfo: %s", err.Error())
		}

		result, err = f.srv.CancelFlightInfo(stream.Context(), &request)
		if err != nil {
			return err
		}

		out := &pb.Result{}
		out.Body, err = proto.Marshal(&result)
		if err != nil {
			return err
		}
		return stream.Send(out)
	case flight.RenewFlightEndpointActionType:
		var (
			request flight.RenewFlightEndpointRequest
			err     error
		)

		if err = proto.Unmarshal(cmd.Body, &request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal FlightEndpoint for RenewFlightEndpoint: %s", err.Error())
		}

		renewedEndpoint, err := f.srv.RenewFlightEndpoint(stream.Context(), &request)
		if err != nil {
			return err
		}

		out := &pb.Result{}
		out.Body, err = proto.Marshal(renewedEndpoint)
		if err != nil {
			return err
		}
		return stream.Send(out)
	case BeginSavepointActionType:
		if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
		}

		var (
			request pb.ActionBeginSavepointRequest
			result  pb.ActionBeginSavepointResult
			id      []byte
			err     error
		)
		if err = anycmd.UnmarshalTo(&request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
		}

		if id, err = f.srv.BeginSavepoint(stream.Context(), &request); err != nil {
			return err
		}

		result.SavepointId = id
		out, err := packActionResult(&result)
		if err != nil {
			return err
		}
		return stream.Send(out)
	case BeginTransactionActionType:
		if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
		}

		var (
			request pb.ActionBeginTransactionRequest
			result  pb.ActionBeginTransactionResult
			id      []byte
			err     error
		)
		if err = anycmd.UnmarshalTo(&request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
		}

		if id, err = f.srv.BeginTransaction(stream.Context(), &request); err != nil {
			return err
		}

		result.TransactionId = id
		out, err := packActionResult(&result)
		if err != nil {
			return err
		}
		return stream.Send(out)
	case CancelQueryActionType:
		if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
		}

		var (
			//nolint:staticcheck,SA1019 for backward compatibility
			request pb.ActionCancelQueryRequest
			//nolint:staticcheck,SA1019 for backward compatibility
			result pb.ActionCancelQueryResult
			info   flight.FlightInfo
			err    error
		)

		if err = anycmd.UnmarshalTo(&request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
		}

		if err = proto.Unmarshal(request.Info, &info); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal FlightInfo for CancelQuery: %s", err)
		}

		if cancel, ok := f.srv.(cancelQueryServer); ok {
			result.Result, err = cancel.CancelQuery(stream.Context(), &cancelQueryRequest{&info})
			if err != nil {
				return err
			}
		} else {
			cancelFlightInfoRequest := flight.CancelFlightInfoRequest{Info: &info}
			cancelFlightInfoResult, err := f.srv.CancelFlightInfo(stream.Context(), &cancelFlightInfoRequest)
			if err != nil {
				return err
			}
			result.Result = cancelStatusToCancelResult(cancelFlightInfoResult.Status)
		}

		out, err := packActionResult(&result)
		if err != nil {
			return err
		}
		return stream.Send(out)
	case CreatePreparedStatementActionType:
		if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
		}

		var (
			request pb.ActionCreatePreparedStatementRequest
			result  pb.ActionCreatePreparedStatementResult
			ret     pb.Result
		)
		if err := anycmd.UnmarshalTo(&request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
		}

		output, err := f.srv.CreatePreparedStatement(stream.Context(), &request)
		if err != nil {
			return err
		}

		result.PreparedStatementHandle = output.Handle
		if output.DatasetSchema != nil {
			result.DatasetSchema = flight.SerializeSchema(output.DatasetSchema, f.mem)
		}
		if output.ParameterSchema != nil {
			result.ParameterSchema = flight.SerializeSchema(output.ParameterSchema, f.mem)
		}

		if err := anycmd.MarshalFrom(&result); err != nil {
			return status.Errorf(codes.Internal, "unable to marshal final response: %s", err.Error())
		}

		if ret.Body, err = proto.Marshal(&anycmd); err != nil {
			return status.Errorf(codes.Internal, "unable to marshal result: %s", err.Error())
		}
		return stream.Send(&ret)
	case CreatePreparedSubstraitPlanActionType:
		if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
		}

		var (
			request pb.ActionCreatePreparedSubstraitPlanRequest
			result  pb.ActionCreatePreparedStatementResult
			ret     pb.Result
		)
		if err := anycmd.UnmarshalTo(&request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
		}

		output, err := f.srv.CreatePreparedSubstraitPlan(stream.Context(), &createPreparedSubstraitPlanReq{&request})
		if err != nil {
			return err
		}

		result.PreparedStatementHandle = output.Handle
		if output.DatasetSchema != nil {
			result.DatasetSchema = flight.SerializeSchema(output.DatasetSchema, f.mem)
		}
		if output.ParameterSchema != nil {
			result.ParameterSchema = flight.SerializeSchema(output.ParameterSchema, f.mem)
		}

		if err := anycmd.MarshalFrom(&result); err != nil {
			return status.Errorf(codes.Internal, "unable to marshal final response: %s", err.Error())
		}

		if ret.Body, err = proto.Marshal(&anycmd); err != nil {
			return status.Errorf(codes.Internal, "unable to marshal result: %s", err.Error())
		}
		return stream.Send(&ret)
	case ClosePreparedStatementActionType:
		if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
		}

		var request pb.ActionClosePreparedStatementRequest
		if err := anycmd.UnmarshalTo(&request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
		}

		if err := f.srv.ClosePreparedStatement(stream.Context(), &request); err != nil {
			return err
		}

		return stream.Send(&pb.Result{})
	case EndTransactionActionType:
		if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
		}

		var request pb.ActionEndTransactionRequest
		if err := anycmd.UnmarshalTo(&request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
		}

		if err := f.srv.EndTransaction(stream.Context(), &request); err != nil {
			return err
		}

		return stream.Send(&pb.Result{})
	case EndSavepointActionType:
		if err := proto.Unmarshal(cmd.Body, &anycmd); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to parse command: %s", err.Error())
		}

		var request pb.ActionEndSavepointRequest
		if err := anycmd.UnmarshalTo(&request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal google.protobuf.Any: %s", err.Error())
		}

		if err := f.srv.EndSavepoint(stream.Context(), &request); err != nil {
			return err
		}

		return stream.Send(&pb.Result{})
	case flight.SetSessionOptionsActionType:
		var (
			request flight.SetSessionOptionsRequest
			err     error
		)

		if err = proto.Unmarshal(cmd.Body, &request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal SetSessionOptionsRequest: %s", err.Error())
		}

		response, err := f.srv.SetSessionOptions(stream.Context(), &request)
		if err != nil {
			return err
		}

		out := &pb.Result{}
		out.Body, err = proto.Marshal(response)
		if err != nil {
			return err
		}
		return stream.Send(out)
	case flight.GetSessionOptionsActionType:
		var (
			request flight.GetSessionOptionsRequest
			err     error
		)

		if err = proto.Unmarshal(cmd.Body, &request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal GetSessionOptionsRequest: %s", err.Error())
		}

		response, err := f.srv.GetSessionOptions(stream.Context(), &request)
		if err != nil {
			return err
		}

		out := &pb.Result{}
		out.Body, err = proto.Marshal(response)
		if err != nil {
			return err
		}
		return stream.Send(out)
	case flight.CloseSessionActionType:
		var (
			request flight.CloseSessionRequest
			err     error
		)

		if err = proto.Unmarshal(cmd.Body, &request); err != nil {
			return status.Errorf(codes.InvalidArgument, "unable to unmarshal CloseSessionRequest: %s", err.Error())
		}

		response, err := f.srv.CloseSession(stream.Context(), &request)
		if err != nil {
			return err
		}

		out := &pb.Result{}
		out.Body, err = proto.Marshal(response)
		if err != nil {
			return err
		}
		return stream.Send(out)
	default:
		return status.Error(codes.InvalidArgument, "the defined request is invalid.")
	}
}