in spark/sql/sparksession.go [122:164]
func (s *sparkSessionImpl) Sql(ctx context.Context, query string) (DataFrame, error) {
// Due to the nature of Spark, we have to first submit the SQL query immediately as a command
// to make sure that all side effects have been executed properly. If no side effects are present,
// then simply prepare this as a SQL relation.
plan := &proto.Plan{
OpType: &proto.Plan_Command{
Command: &proto.Command{
CommandType: &proto.Command_SqlCommand{
SqlCommand: &proto.SqlCommand{
Sql: query,
},
},
},
},
}
// We need an execute command here.
_, _, properties, err := s.client.ExecuteCommand(ctx, plan)
if err != nil {
return nil, sparkerrors.WithType(fmt.Errorf("failed to execute sql: %s: %w", query, err), sparkerrors.ExecutionError)
}
val, ok := properties["sql_command_result"]
if !ok {
plan := &proto.Relation{
Common: &proto.RelationCommon{
PlanId: newPlanId(),
},
RelType: &proto.Relation_Sql{
Sql: &proto.SQL{
Query: query,
},
},
}
return NewDataFrame(s, plan), nil
} else {
rel := val.(*proto.Relation)
rel.Common = &proto.RelationCommon{
PlanId: newPlanId(),
}
return NewDataFrame(s, rel), nil
}
}