in client/sql/sparksession.go [91:122]
func (s *sparkSessionImpl) Sql(query string) (DataFrame, error) {
plan := &proto.Plan{
OpType: &proto.Plan_Command{
Command: &proto.Command{
CommandType: &proto.Command_SqlCommand{
SqlCommand: &proto.SqlCommand{
Sql: query,
},
},
},
},
}
responseClient, err := s.executePlan(plan)
if err != nil {
return nil, fmt.Errorf("failed to execute sql: %s: %w", query, err)
}
for {
response, err := responseClient.Recv()
if err != nil {
return nil, fmt.Errorf("failed to receive ExecutePlan response: %w", err)
}
sqlCommandResult := response.GetSqlCommandResult()
if sqlCommandResult == nil {
continue
}
return &dataFrameImpl{
sparkSession: s,
relation: sqlCommandResult.GetRelation(),
}, nil
}
return nil, fmt.Errorf("failed to get SqlCommandResult in ExecutePlan response")
}