func()

in spark/sql/sparksession.go [174:228]


func (s *sparkSessionImpl) CreateDataFrameFromArrow(ctx context.Context, data arrow.Table) (DataFrame, error) {
	// Generate the schema.
	// schema := types.ArrowSchemaToProto(data.Schema())
	// schemaString := ""
	// TODO (PySpark does a lot of casting here to convert the schema that does not exist yet.

	// Convert the Arrow Table into a byte array of arrow IPC messages.
	buf := new(bytes.Buffer)
	w := ipc.NewWriter(buf, ipc.WithSchema(data.Schema()))
	defer w.Close()

	// Create a RecordReader from the table
	rr := array.NewTableReader(data, int64(data.NumRows()))
	defer rr.Release()

	// Read the records from the table and write them to the buffer
	for rr.Next() {
		record := rr.Record()
		if err := w.Write(record); err != nil {
			return nil, sparkerrors.WithType(fmt.Errorf("failed to write record: %w", err), sparkerrors.WriteError)
		}
	}

	// Create a local relation object
	plan := &proto.Relation{
		Common: &proto.RelationCommon{
			PlanId: newPlanId(),
		},
		RelType: &proto.Relation_LocalRelation{
			LocalRelation: &proto.LocalRelation{
				// Schema: &schemaString,
				Data: buf.Bytes(),
			},
		},
	}

	// Capture the column names from the schema:
	columnNames := make([]string, data.NumCols())
	for i, field := range data.Schema().Fields() {
		columnNames[i] = field.Name
	}

	dfPlan := &proto.Relation{
		Common: &proto.RelationCommon{
			PlanId: newPlanId(),
		},
		RelType: &proto.Relation_ToDf{
			ToDf: &proto.ToDF{
				Input:       plan,
				ColumnNames: columnNames,
			},
		},
	}
	return NewDataFrame(s, dfPlan), nil
}