in spark/client/client.go [362:435]
func (c *ExecutePlanClient) ToTable() (*types.StructType, arrow.Table, error) {
var recordBatches []arrow.Record
var arrowSchema *arrow.Schema
recordBatches = make([]arrow.Record, 0)
// Explicitly needed when tracking re-attachble execution.
c.done = false
for {
resp, err := c.responseStream.Recv()
// EOF is received when the last message has been processed and the stream
// finished normally.
if errors.Is(err, io.EOF) {
break
}
// If the error was not EOF, there might be another error.
if se := sparkerrors.FromRPCError(err); se != nil {
return nil, nil, sparkerrors.WithType(se, sparkerrors.ExecutionError)
}
// Process the message
// Check that the server returned the session ID that we were expecting
// and that it has not changed.
if resp.GetSessionId() != c.sessionId {
return c.schema, nil, sparkerrors.WithType(&sparkerrors.InvalidServerSideSessionDetailsError{
OwnSessionId: c.sessionId,
ReceivedSessionId: resp.GetSessionId(),
}, sparkerrors.InvalidServerSideSessionError)
}
// Check if the response has already the schema set and if yes, convert
// the proto DataType to a StructType.
if resp.Schema != nil {
c.schema, err = types.ConvertProtoDataTypeToStructType(resp.Schema)
if err != nil {
return nil, nil, sparkerrors.WithType(err, sparkerrors.ExecutionError)
}
}
switch x := resp.ResponseType.(type) {
case *proto.ExecutePlanResponse_SqlCommandResult_:
if val := x.SqlCommandResult.GetRelation(); val != nil {
c.properties["sql_command_result"] = val
}
case *proto.ExecutePlanResponse_ArrowBatch_:
// Do nothing.
record, err := types.ReadArrowBatchToRecord(x.ArrowBatch.Data, c.schema)
if err != nil {
return nil, nil, err
}
arrowSchema = record.Schema()
record.Retain()
recordBatches = append(recordBatches, record)
case *proto.ExecutePlanResponse_ResultComplete_:
c.done = true
default:
// Explicitly ignore messages that we cannot process at the moment.
}
}
// Check that the result is logically complete. The result might not be complete
// because after 2 minutes the server will interrupt the connection, and we have to
// send a ReAttach execute request.
if c.opts.ReattachExecution && !c.done {
return nil, nil, sparkerrors.WithType(fmt.Errorf("the result is not complete"), sparkerrors.ExecutionError)
}
// Return the schema and table.
if arrowSchema == nil {
return c.schema, nil, nil
} else {
return c.schema, array.NewTableFromRecords(arrowSchema, recordBatches), nil
}
}