arrow/array/record.go (355 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package array import ( "bytes" "fmt" "iter" "strings" "sync/atomic" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/internal/json" ) // RecordReader reads a stream of records. type RecordReader interface { Retain() Release() Schema() *arrow.Schema Next() bool Record() arrow.Record Err() error } // simpleRecords is a simple iterator over a collection of records. type simpleRecords struct { refCount atomic.Int64 schema *arrow.Schema recs []arrow.Record cur arrow.Record } // NewRecordReader returns a simple iterator over the given slice of records. func NewRecordReader(schema *arrow.Schema, recs []arrow.Record) (RecordReader, error) { rs := &simpleRecords{ schema: schema, recs: recs, cur: nil, } rs.refCount.Add(1) for _, rec := range rs.recs { rec.Retain() } for _, rec := range recs { if !rec.Schema().Equal(rs.schema) { rs.Release() return nil, fmt.Errorf("arrow/array: mismatch schema") } } return rs, nil } // Retain increases the reference count by 1. // Retain may be called simultaneously from multiple goroutines. func (rs *simpleRecords) Retain() { rs.refCount.Add(1) } // Release decreases the reference count by 1. // When the reference count goes to zero, the memory is freed. // Release may be called simultaneously from multiple goroutines. func (rs *simpleRecords) Release() { debug.Assert(rs.refCount.Load() > 0, "too many releases") if rs.refCount.Add(-1) == 0 { if rs.cur != nil { rs.cur.Release() } for _, rec := range rs.recs { rec.Release() } rs.recs = nil } } func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema } func (rs *simpleRecords) Record() arrow.Record { return rs.cur } func (rs *simpleRecords) Next() bool { if len(rs.recs) == 0 { return false } if rs.cur != nil { rs.cur.Release() } rs.cur = rs.recs[0] rs.recs = rs.recs[1:] return true } func (rs *simpleRecords) Err() error { return nil } // simpleRecord is a basic, non-lazy in-memory record batch. type simpleRecord struct { refCount atomic.Int64 schema *arrow.Schema rows int64 arrs []arrow.Array } // NewRecord returns a basic, non-lazy in-memory record batch. // // NewRecord panics if the columns and schema are inconsistent. // NewRecord panics if rows is larger than the height of the columns. func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Record { rec := &simpleRecord{ schema: schema, rows: nrows, arrs: make([]arrow.Array, len(cols)), } rec.refCount.Add(1) copy(rec.arrs, cols) for _, arr := range rec.arrs { arr.Retain() } if rec.rows < 0 { switch len(rec.arrs) { case 0: rec.rows = 0 default: rec.rows = int64(rec.arrs[0].Len()) } } err := rec.validate() if err != nil { rec.Release() panic(err) } return rec } func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.Record, error) { if i < 0 || i >= len(rec.arrs) { return nil, fmt.Errorf("arrow/array: column index out of range [0, %d): got=%d", len(rec.arrs), i) } if arr.Len() != int(rec.rows) { return nil, fmt.Errorf("arrow/array: mismatch number of rows in column %q: got=%d, want=%d", rec.schema.Field(i).Name, arr.Len(), rec.rows, ) } f := rec.schema.Field(i) if !arrow.TypeEqual(f.Type, arr.DataType()) { return nil, fmt.Errorf("arrow/array: column %q type mismatch: got=%v, want=%v", f.Name, arr.DataType(), f.Type, ) } arrs := make([]arrow.Array, len(rec.arrs)) copy(arrs, rec.arrs) arrs[i] = arr return NewRecord(rec.schema, arrs, rec.rows), nil } func (rec *simpleRecord) validate() error { if rec.rows == 0 && len(rec.arrs) == 0 { return nil } if len(rec.arrs) != rec.schema.NumFields() { return fmt.Errorf("arrow/array: number of columns/fields mismatch") } for i, arr := range rec.arrs { f := rec.schema.Field(i) if int64(arr.Len()) < rec.rows { return fmt.Errorf("arrow/array: mismatch number of rows in column %q: got=%d, want=%d", f.Name, arr.Len(), rec.rows, ) } if !arrow.TypeEqual(f.Type, arr.DataType()) { return fmt.Errorf("arrow/array: column %q type mismatch: got=%v, want=%v", f.Name, arr.DataType(), f.Type, ) } } return nil } // Retain increases the reference count by 1. // Retain may be called simultaneously from multiple goroutines. func (rec *simpleRecord) Retain() { rec.refCount.Add(1) } // Release decreases the reference count by 1. // When the reference count goes to zero, the memory is freed. // Release may be called simultaneously from multiple goroutines. func (rec *simpleRecord) Release() { debug.Assert(rec.refCount.Load() > 0, "too many releases") if rec.refCount.Add(-1) == 0 { for _, arr := range rec.arrs { arr.Release() } rec.arrs = nil } } func (rec *simpleRecord) Schema() *arrow.Schema { return rec.schema } func (rec *simpleRecord) NumRows() int64 { return rec.rows } func (rec *simpleRecord) NumCols() int64 { return int64(len(rec.arrs)) } func (rec *simpleRecord) Columns() []arrow.Array { return rec.arrs } func (rec *simpleRecord) Column(i int) arrow.Array { return rec.arrs[i] } func (rec *simpleRecord) ColumnName(i int) string { return rec.schema.Field(i).Name } // NewSlice constructs a zero-copy slice of the record with the indicated // indices i and j, corresponding to array[i:j]. // The returned record must be Release()'d after use. // // NewSlice panics if the slice is outside the valid range of the record array. // NewSlice panics if j < i. func (rec *simpleRecord) NewSlice(i, j int64) arrow.Record { arrs := make([]arrow.Array, len(rec.arrs)) for ii, arr := range rec.arrs { arrs[ii] = NewSlice(arr, i, j) } defer func() { for _, arr := range arrs { arr.Release() } }() return NewRecord(rec.schema, arrs, j-i) } func (rec *simpleRecord) String() string { o := new(strings.Builder) fmt.Fprintf(o, "record:\n %v\n", rec.schema) fmt.Fprintf(o, " rows: %d\n", rec.rows) for i, col := range rec.arrs { fmt.Fprintf(o, " col[%d][%s]: %v\n", i, rec.schema.Field(i).Name, col) } return o.String() } func (rec *simpleRecord) MarshalJSON() ([]byte, error) { arr := RecordToStructArray(rec) defer arr.Release() return arr.MarshalJSON() } // RecordBuilder eases the process of building a Record, iteratively, from // a known Schema. type RecordBuilder struct { refCount atomic.Int64 mem memory.Allocator schema *arrow.Schema fields []Builder } // NewRecordBuilder returns a builder, using the provided memory allocator and a schema. func NewRecordBuilder(mem memory.Allocator, schema *arrow.Schema) *RecordBuilder { b := &RecordBuilder{ mem: mem, schema: schema, fields: make([]Builder, schema.NumFields()), } b.refCount.Add(1) for i := 0; i < schema.NumFields(); i++ { b.fields[i] = NewBuilder(b.mem, schema.Field(i).Type) } return b } // Retain increases the reference count by 1. // Retain may be called simultaneously from multiple goroutines. func (b *RecordBuilder) Retain() { b.refCount.Add(1) } // Release decreases the reference count by 1. func (b *RecordBuilder) Release() { debug.Assert(b.refCount.Load() > 0, "too many releases") if b.refCount.Add(-1) == 0 { for _, f := range b.fields { f.Release() } b.fields = nil } } func (b *RecordBuilder) Schema() *arrow.Schema { return b.schema } func (b *RecordBuilder) Fields() []Builder { return b.fields } func (b *RecordBuilder) Field(i int) Builder { return b.fields[i] } func (b *RecordBuilder) Reserve(size int) { for _, f := range b.fields { f.Reserve(size) } } // NewRecord creates a new record from the memory buffers and resets the // RecordBuilder so it can be used to build a new record. // // The returned Record must be Release()'d after use. // // NewRecord panics if the fields' builder do not have the same length. func (b *RecordBuilder) NewRecord() arrow.Record { cols := make([]arrow.Array, len(b.fields)) rows := int64(0) defer func(cols []arrow.Array) { for _, col := range cols { if col == nil { continue } col.Release() } }(cols) for i, f := range b.fields { cols[i] = f.NewArray() irow := int64(cols[i].Len()) if i > 0 && irow != rows { panic(fmt.Errorf("arrow/array: field %d has %d rows. want=%d", i, irow, rows)) } rows = irow } return NewRecord(b.schema, cols, rows) } // UnmarshalJSON for record builder will read in a single object and add the values // to each field in the recordbuilder, missing fields will get a null and unexpected // keys will be ignored. If reading in an array of records as a single batch, then use // a structbuilder and use RecordFromStruct. func (b *RecordBuilder) UnmarshalJSON(data []byte) error { dec := json.NewDecoder(bytes.NewReader(data)) // should start with a '{' t, err := dec.Token() if err != nil { return err } if delim, ok := t.(json.Delim); !ok || delim != '{' { return fmt.Errorf("record should start with '{', not %s", t) } keylist := make(map[string]bool) for dec.More() { keyTok, err := dec.Token() if err != nil { return err } key := keyTok.(string) if keylist[key] { return fmt.Errorf("key %s shows up twice in row to be decoded", key) } keylist[key] = true indices := b.schema.FieldIndices(key) if len(indices) == 0 { var extra interface{} if err := dec.Decode(&extra); err != nil { return err } continue } if err := b.fields[indices[0]].UnmarshalOne(dec); err != nil { return err } } for i := 0; i < b.schema.NumFields(); i++ { if !keylist[b.schema.Field(i).Name] { b.fields[i].AppendNull() } } return nil } type iterReader struct { refCount atomic.Int64 schema *arrow.Schema cur arrow.Record next func() (arrow.Record, error, bool) stop func() err error } func (ir *iterReader) Schema() *arrow.Schema { return ir.schema } func (ir *iterReader) Retain() { ir.refCount.Add(1) } func (ir *iterReader) Release() { debug.Assert(ir.refCount.Load() > 0, "too many releases") if ir.refCount.Add(-1) == 0 { ir.stop() ir.schema, ir.next = nil, nil if ir.cur != nil { ir.cur.Release() } } } func (ir *iterReader) Record() arrow.Record { return ir.cur } func (ir *iterReader) Err() error { return ir.err } func (ir *iterReader) Next() bool { if ir.cur != nil { ir.cur.Release() } var ok bool ir.cur, ir.err, ok = ir.next() if ir.err != nil { ir.stop() return false } return ok } // ReaderFromIter wraps a go iterator for arrow.Record + error into a RecordReader // interface object for ease of use. func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.Record, error]) RecordReader { next, stop := iter.Pull2(itr) rdr := &iterReader{ schema: schema, next: next, stop: stop, } rdr.refCount.Add(1) return rdr } // IterFromReader converts a RecordReader interface into an iterator that // you can use range on. The semantics are still important, if a record // that is returned is desired to be utilized beyond the scope of an iteration // then Retain must be called on it. func IterFromReader(rdr RecordReader) iter.Seq2[arrow.Record, error] { rdr.Retain() return func(yield func(arrow.Record, error) bool) { defer rdr.Release() for rdr.Next() { if !yield(rdr.Record(), nil) { return } } if rdr.Err() != nil { yield(nil, rdr.Err()) } } } var ( _ arrow.Record = (*simpleRecord)(nil) _ RecordReader = (*simpleRecords)(nil) )