arrow/avro/reader.go (236 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package avro import ( "context" "errors" "fmt" "io" "sync/atomic" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/hamba/avro/v2/ocf" "github.com/tidwall/sjson" avro "github.com/hamba/avro/v2" ) var ErrMismatchFields = errors.New("arrow/avro: number of records mismatch") // Option configures an Avro reader/writer. type ( Option func(config) config *OCFReader ) type schemaEdit struct { method string path string value any } // Reader wraps goavro/OCFReader and creates array.Records from a schema. type OCFReader struct { r *ocf.Decoder avroSchema string avroSchemaEdits []schemaEdit schema *arrow.Schema refs atomic.Int64 bld *array.RecordBuilder bldMap *fieldPos ldr *dataLoader cur arrow.Record err error primed bool readerCtx context.Context readCancel func() maxOCF int maxRec int avroChan chan any avroDatumCount int64 avroChanSize int recChan chan arrow.Record bldDone chan struct{} recChanSize int chunk int mem memory.Allocator } // NewReader returns a reader that reads from an Avro OCF file and creates // arrow.Records from the converted avro data. func NewOCFReader(r io.Reader, opts ...Option) (*OCFReader, error) { ocfr, err := ocf.NewDecoder(r) if err != nil { return nil, fmt.Errorf("%w: could not create avro ocfreader", arrow.ErrInvalid) } rr := &OCFReader{ r: ocfr, chunk: 1, avroChanSize: 500, recChanSize: 10, } rr.refs.Add(1) for _, opt := range opts { opt(rr) } rr.avroChan = make(chan any, rr.avroChanSize) rr.recChan = make(chan arrow.Record, rr.recChanSize) rr.bldDone = make(chan struct{}) schema, err := avro.Parse(string(ocfr.Metadata()["avro.schema"])) if err != nil { return nil, fmt.Errorf("%w: could not parse avro header", arrow.ErrInvalid) } rr.avroSchema = schema.String() if len(rr.avroSchemaEdits) > 0 { // execute schema edits for _, e := range rr.avroSchemaEdits { err := rr.editAvroSchema(e) if err != nil { return nil, fmt.Errorf("%w: could not edit avro schema", arrow.ErrInvalid) } } // validate edited schema schema, err = avro.Parse(rr.avroSchema) if err != nil { return nil, fmt.Errorf("%w: could not parse modified avro schema", arrow.ErrInvalid) } } rr.schema, err = ArrowSchemaFromAvro(schema) if err != nil { return nil, fmt.Errorf("%w: could not convert avro schema", arrow.ErrInvalid) } if rr.mem == nil { rr.mem = memory.DefaultAllocator } rr.readerCtx, rr.readCancel = context.WithCancel(context.Background()) go rr.decodeOCFToChan() rr.bld = array.NewRecordBuilder(rr.mem, rr.schema) rr.bldMap = newFieldPos() rr.ldr = newDataLoader() for idx, fb := range rr.bld.Fields() { mapFieldBuilders(fb, rr.schema.Field(idx), rr.bldMap) } rr.ldr.drawTree(rr.bldMap) go rr.recordFactory() return rr, nil } // Reuse allows the OCFReader to be reused to read another Avro file provided the // new Avro file has an identical schema. func (rr *OCFReader) Reuse(r io.Reader, opts ...Option) error { rr.Close() rr.err = nil ocfr, err := ocf.NewDecoder(r) if err != nil { return fmt.Errorf("%w: could not create avro ocfreader", arrow.ErrInvalid) } schema, err := avro.Parse(string(ocfr.Metadata()["avro.schema"])) if err != nil { return fmt.Errorf("%w: could not parse avro header", arrow.ErrInvalid) } if rr.avroSchema != schema.String() { return fmt.Errorf("%w: avro schema mismatch", arrow.ErrInvalid) } rr.r = ocfr for _, opt := range opts { opt(rr) } rr.maxOCF = 0 rr.maxRec = 0 rr.avroDatumCount = 0 rr.primed = false rr.avroChan = make(chan any, rr.avroChanSize) rr.recChan = make(chan arrow.Record, rr.recChanSize) rr.bldDone = make(chan struct{}) rr.readerCtx, rr.readCancel = context.WithCancel(context.Background()) go rr.decodeOCFToChan() go rr.recordFactory() return nil } // Err returns the last error encountered during the iteration over the // underlying Avro file. func (r *OCFReader) Err() error { return r.err } // AvroSchema returns the Avro schema of the Avro OCF func (r *OCFReader) AvroSchema() string { return r.avroSchema } // Schema returns the converted Arrow schema of the Avro OCF func (r *OCFReader) Schema() *arrow.Schema { return r.schema } // Record returns the current record that has been extracted from the // underlying Avro OCF file. // It is valid until the next call to Next. func (r *OCFReader) Record() arrow.Record { return r.cur } // Metrics returns the maximum queue depth of the Avro record read cache and of the // converted Arrow record cache. func (r *OCFReader) Metrics() string { return fmt.Sprintf("Max. OCF queue depth: %d/%d Max. record queue depth: %d/%d", r.maxOCF, r.avroChanSize, r.maxRec, r.recChanSize) } // OCFRecordsReadCount returns the number of Avro datum that were read from the Avro file. func (r *OCFReader) OCFRecordsReadCount() int64 { return r.avroDatumCount } // Close closes the OCFReader's Avro record read cache and converted Arrow record cache. OCFReader must // be closed if the Avro OCF's records have not been read to completion. func (r *OCFReader) Close() { r.readCancel() r.err = r.readerCtx.Err() } func (r *OCFReader) editAvroSchema(e schemaEdit) error { var err error switch e.method { case "set": r.avroSchema, err = sjson.Set(r.avroSchema, e.path, e.value) if err != nil { return fmt.Errorf("%w: schema edit 'set %s = %v' failure - %v", arrow.ErrInvalid, e.path, e.value, err) } case "delete": r.avroSchema, err = sjson.Delete(r.avroSchema, e.path) if err != nil { return fmt.Errorf("%w: schema edit 'delete' failure - %v", arrow.ErrInvalid, err) } default: return fmt.Errorf("%w: schema edit method must be 'set' or 'delete'", arrow.ErrInvalid) } return nil } // Next returns whether a Record can be received from the converted record queue. // The user should check Err() after call to Next that return false to check // if an error took place. func (r *OCFReader) Next() bool { if r.cur != nil { r.cur.Release() r.cur = nil } if r.maxOCF < len(r.avroChan) { r.maxOCF = len(r.avroChan) } if r.maxRec < len(r.recChan) { r.maxRec = len(r.recChan) } select { case r.cur = <-r.recChan: case <-r.bldDone: if len(r.recChan) > 0 { r.cur = <-r.recChan } } if r.err != nil { return false } return r.cur != nil } // WithAllocator specifies the Arrow memory allocator used while building records. func WithAllocator(mem memory.Allocator) Option { return func(cfg config) { cfg.mem = mem } } // WithReadCacheSize specifies the size of the OCF record decode queue, default value // is 500. func WithReadCacheSize(n int) Option { return func(cfg config) { if n < 1 { cfg.avroChanSize = 500 } else { cfg.avroChanSize = n } } } // WithRecordCacheSize specifies the size of the converted Arrow record queue, default // value is 1. func WithRecordCacheSize(n int) Option { return func(cfg config) { if n < 1 { cfg.recChanSize = 1 } else { cfg.recChanSize = n } } } // WithSchemaEdit specifies modifications to the Avro schema. Supported methods are 'set' and // 'delete'. Set sets the value for the specified path. Delete deletes the value for the specified path. // A path is in dot syntax, such as "fields.1" or "fields.0.type". The modified Avro schema is // validated before conversion to Arrow schema - NewOCFReader will return an error if the modified schema // cannot be parsed. func WithSchemaEdit(method, path string, value any) Option { return func(cfg config) { var e schemaEdit e.method = method e.path = path e.value = value cfg.avroSchemaEdits = append(cfg.avroSchemaEdits, e) } } // WithChunk specifies the chunk size used while reading Avro OCF files. // // If n is zero or 1, no chunking will take place and the reader will create // one record per row. // If n is greater than 1, chunks of n rows will be read. // If n is negative, the reader will load the whole Avro OCF file into memory and // create one big record with all the rows. func WithChunk(n int) Option { return func(cfg config) { cfg.chunk = n } } // Retain increases the reference count by 1. // Retain may be called simultaneously from multiple goroutines. func (r *OCFReader) Retain() { r.refs.Add(1) } // Release decreases the reference count by 1. // When the reference count goes to zero, the memory is freed. // Release may be called simultaneously from multiple goroutines. func (r *OCFReader) Release() { debug.Assert(r.refs.Load() > 0, "too many releases") if r.refs.Add(-1) == 0 { if r.cur != nil { r.cur.Release() } } } var _ array.RecordReader = (*OCFReader)(nil)