in spark/sql/sparksession.go [174:228]
func (s *sparkSessionImpl) CreateDataFrameFromArrow(ctx context.Context, data arrow.Table) (DataFrame, error) {
// Generate the schema.
// schema := types.ArrowSchemaToProto(data.Schema())
// schemaString := ""
// TODO (PySpark does a lot of casting here to convert the schema that does not exist yet.
// Convert the Arrow Table into a byte array of arrow IPC messages.
buf := new(bytes.Buffer)
w := ipc.NewWriter(buf, ipc.WithSchema(data.Schema()))
defer w.Close()
// Create a RecordReader from the table
rr := array.NewTableReader(data, int64(data.NumRows()))
defer rr.Release()
// Read the records from the table and write them to the buffer
for rr.Next() {
record := rr.Record()
if err := w.Write(record); err != nil {
return nil, sparkerrors.WithType(fmt.Errorf("failed to write record: %w", err), sparkerrors.WriteError)
}
}
// Create a local relation object
plan := &proto.Relation{
Common: &proto.RelationCommon{
PlanId: newPlanId(),
},
RelType: &proto.Relation_LocalRelation{
LocalRelation: &proto.LocalRelation{
// Schema: &schemaString,
Data: buf.Bytes(),
},
},
}
// Capture the column names from the schema:
columnNames := make([]string, data.NumCols())
for i, field := range data.Schema().Fields() {
columnNames[i] = field.Name
}
dfPlan := &proto.Relation{
Common: &proto.RelationCommon{
PlanId: newPlanId(),
},
RelType: &proto.Relation_ToDf{
ToDf: &proto.ToDF{
Input: plan,
ColumnNames: columnNames,
},
},
}
return NewDataFrame(s, dfPlan), nil
}