in spark/client/client.go [76:98]
func (s *sparkConnectClientImpl) ExecuteCommand(ctx context.Context, plan *proto.Plan) (arrow.Table, *types.StructType, map[string]any, error) {
request := s.newExecutePlanRequest(plan)
// Check that the supplied plan is actually a command.
if plan.GetCommand() == nil {
return nil, nil, nil, sparkerrors.WithType(
fmt.Errorf("the supplied plan does not contain a command"), sparkerrors.ExecutionError)
}
// Append the other items to the request.
ctx = metadata.NewOutgoingContext(ctx, s.metadata)
c, err := s.client.ExecutePlan(ctx, request)
if err != nil {
return nil, nil, nil, sparkerrors.WithType(
fmt.Errorf("failed to call ExecutePlan in session %s: %w", s.sessionId, err), sparkerrors.ExecutionError)
}
respHandler := NewExecuteResponseStream(c, s.sessionId, *request.OperationId, s.opts)
schema, table, err := respHandler.ToTable()
if err != nil {
return nil, nil, nil, err
}
return table, schema, respHandler.Properties(), nil
}