func()

in client/sql/dataframe.go [119:161]


func (df *dataFrameImpl) Collect() ([]Row, error) {
	responseClient, err := df.sparkSession.executePlan(df.createPlan())
	if err != nil {
		return nil, fmt.Errorf("failed to execute plan: %w", err)
	}

	var schema *StructType
	var allRows []Row

	for {
		response, err := responseClient.Recv()
		if err != nil {
			if errors.Is(err, io.EOF) {
				return allRows, nil
			} else {
				return nil, fmt.Errorf("failed to receive plan execution response: %w", err)
			}
		}

		dataType := response.GetSchema()
		if dataType != nil {
			schema = convertProtoDataTypeToStructType(dataType)
			continue
		}

		arrowBatch := response.GetArrowBatch()
		if arrowBatch == nil {
			continue
		}

		rowBatch, err := readArrowBatchData(arrowBatch.Data, schema)
		if err != nil {
			return nil, err
		}

		if allRows == nil {
			allRows = make([]Row, 0, len(rowBatch))
		}
		allRows = append(allRows, rowBatch...)
	}

	return allRows, nil
}