in client/sql/dataframe.go [119:161]
func (df *dataFrameImpl) Collect() ([]Row, error) {
responseClient, err := df.sparkSession.executePlan(df.createPlan())
if err != nil {
return nil, fmt.Errorf("failed to execute plan: %w", err)
}
var schema *StructType
var allRows []Row
for {
response, err := responseClient.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return allRows, nil
} else {
return nil, fmt.Errorf("failed to receive plan execution response: %w", err)
}
}
dataType := response.GetSchema()
if dataType != nil {
schema = convertProtoDataTypeToStructType(dataType)
continue
}
arrowBatch := response.GetArrowBatch()
if arrowBatch == nil {
continue
}
rowBatch, err := readArrowBatchData(arrowBatch.Data, schema)
if err != nil {
return nil, err
}
if allRows == nil {
allRows = make([]Row, 0, len(rowBatch))
}
allRows = append(allRows, rowBatch...)
}
return allRows, nil
}