arrow/ipc/file_reader.go (697 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package ipc import ( "bytes" "encoding/binary" "errors" "fmt" "io" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/bitutil" "github.com/apache/arrow-go/v18/arrow/endian" "github.com/apache/arrow-go/v18/arrow/internal" "github.com/apache/arrow-go/v18/arrow/internal/dictutils" "github.com/apache/arrow-go/v18/arrow/internal/flatbuf" "github.com/apache/arrow-go/v18/arrow/memory" ) type readerImpl interface { getFooterEnd() (int64, error) getBytes(offset, length int64) ([]byte, error) dict(memory.Allocator, *footerBlock, int) (dataBlock, error) block(memory.Allocator, *footerBlock, int) (dataBlock, error) } type footerBlock struct { offset int64 buffer *memory.Buffer data *flatbuf.Footer } type dataBlock interface { Offset() int64 Meta() int32 Body() int64 NewMessage() (*Message, error) } const footerSizeLen = 4 var minimumOffsetSize = int64(len(Magic)*2 + footerSizeLen) type basicReaderImpl struct { r ReadAtSeeker } func (r *basicReaderImpl) getBytes(offset, len int64) ([]byte, error) { buf := make([]byte, len) n, err := r.r.ReadAt(buf, offset) if err != nil { return nil, fmt.Errorf("arrow/ipc: could not read %d bytes at offset %d: %w", len, offset, err) } if int64(n) != len { return nil, fmt.Errorf("arrow/ipc: could not read %d bytes at offset %d", len, offset) } return buf, nil } func (r *basicReaderImpl) getFooterEnd() (int64, error) { return r.r.Seek(0, io.SeekEnd) } func (r *basicReaderImpl) block(mem memory.Allocator, f *footerBlock, i int) (dataBlock, error) { var blk flatbuf.Block if !f.data.RecordBatches(&blk, i) { return fileBlock{}, fmt.Errorf("arrow/ipc: could not extract file block %d", i) } return fileBlock{ offset: blk.Offset(), meta: blk.MetaDataLength(), body: blk.BodyLength(), r: r.r, mem: mem, }, nil } func (r *basicReaderImpl) dict(mem memory.Allocator, f *footerBlock, i int) (dataBlock, error) { var blk flatbuf.Block if !f.data.Dictionaries(&blk, i) { return fileBlock{}, fmt.Errorf("arrow/ipc: could not extract dictionary block %d", i) } return fileBlock{ offset: blk.Offset(), meta: blk.MetaDataLength(), body: blk.BodyLength(), r: r.r, mem: mem, }, nil } type mappedReaderImpl struct { data []byte } func (r *mappedReaderImpl) getBytes(offset, length int64) ([]byte, error) { if offset < 0 || offset+int64(length) > int64(len(r.data)) { return nil, fmt.Errorf("arrow/ipc: invalid offset=%d or length=%d", offset, length) } return r.data[offset : offset+length], nil } func (r *mappedReaderImpl) getFooterEnd() (int64, error) { return int64(len(r.data)), nil } func (r *mappedReaderImpl) block(_ memory.Allocator, f *footerBlock, i int) (dataBlock, error) { var blk flatbuf.Block if !f.data.RecordBatches(&blk, i) { return mappedFileBlock{}, fmt.Errorf("arrow/ipc: could not extract file block %d", i) } return mappedFileBlock{ offset: blk.Offset(), meta: blk.MetaDataLength(), body: blk.BodyLength(), data: r.data, }, nil } func (r *mappedReaderImpl) dict(_ memory.Allocator, f *footerBlock, i int) (dataBlock, error) { var blk flatbuf.Block if !f.data.Dictionaries(&blk, i) { return mappedFileBlock{}, fmt.Errorf("arrow/ipc: could not extract dictionary block %d", i) } return mappedFileBlock{ offset: blk.Offset(), meta: blk.MetaDataLength(), body: blk.BodyLength(), data: r.data, }, nil } // FileReader is an Arrow file reader. type FileReader struct { r readerImpl footer footerBlock // fields dictTypeMap memo dictutils.Memo schema *arrow.Schema record arrow.Record irec int // current record index. used for the arrio.Reader interface err error // last error mem memory.Allocator swapEndianness bool } // NewMappedFileReader is like NewFileReader but instead of using a ReadAtSeeker, // which will force copies through the Read/ReadAt methods, it uses a byte slice // and pulls slices directly from the data. This is useful specifically when // dealing with mmapped data so that you can lazily load the buffers and avoid // extraneous copies. The slices used for the record column buffers will simply // reference the existing data instead of performing copies via ReadAt/Read. // // For example, syscall.Mmap returns a byte slice which could be referencing // a shared memory region or otherwise a memory-mapped file. func NewMappedFileReader(data []byte, opts ...Option) (*FileReader, error) { var ( cfg = newConfig(opts...) f = FileReader{ r: &mappedReaderImpl{data: data}, mem: cfg.alloc, } ) if err := f.init(cfg); err != nil { return nil, err } return &f, nil } // NewFileReader opens an Arrow file using the provided reader r. func NewFileReader(r ReadAtSeeker, opts ...Option) (*FileReader, error) { var ( cfg = newConfig(opts...) f = FileReader{ r: &basicReaderImpl{r: r}, memo: dictutils.NewMemo(), mem: cfg.alloc, } ) if err := f.init(cfg); err != nil { return nil, err } return &f, nil } func (f *FileReader) init(cfg *config) error { var err error if cfg.footer.offset <= 0 { cfg.footer.offset, err = f.r.getFooterEnd() if err != nil { return fmt.Errorf("arrow/ipc: could retrieve footer offset: %w", err) } } f.footer.offset = cfg.footer.offset err = f.readFooter() if err != nil { return fmt.Errorf("arrow/ipc: could not decode footer: %w", err) } err = f.readSchema(cfg.ensureNativeEndian) if err != nil { return fmt.Errorf("arrow/ipc: could not decode schema: %w", err) } if cfg.schema != nil && !cfg.schema.Equal(f.schema) { return fmt.Errorf("arrow/ipc: inconsistent schema for reading (got: %v, want: %v)", f.schema, cfg.schema) } return err } func (f *FileReader) readSchema(ensureNativeEndian bool) error { var ( err error kind dictutils.Kind ) schema := f.footer.data.Schema(nil) if schema == nil { return fmt.Errorf("arrow/ipc: could not load schema from flatbuffer data") } f.schema, err = schemaFromFB(schema, &f.memo) if err != nil { return fmt.Errorf("arrow/ipc: could not read schema: %w", err) } if ensureNativeEndian && !f.schema.IsNativeEndian() { f.swapEndianness = true f.schema = f.schema.WithEndianness(endian.NativeEndian) } for i := 0; i < f.NumDictionaries(); i++ { blk, err := f.r.dict(f.mem, &f.footer, i) if err != nil { return fmt.Errorf("arrow/ipc: could not read dictionary[%d]: %w", i, err) } switch { case !bitutil.IsMultipleOf8(blk.Offset()): return fmt.Errorf("arrow/ipc: invalid file offset=%d for dictionary %d", blk.Offset(), i) case !bitutil.IsMultipleOf8(int64(blk.Meta())): return fmt.Errorf("arrow/ipc: invalid file metadata=%d position for dictionary %d", blk.Meta(), i) case !bitutil.IsMultipleOf8(blk.Body()): return fmt.Errorf("arrow/ipc: invalid file body=%d position for dictionary %d", blk.Body(), i) } msg, err := blk.NewMessage() if err != nil { return err } kind, err = readDictionary(&f.memo, msg.meta, msg.body, f.swapEndianness, f.mem) if err != nil { return err } if kind == dictutils.KindReplacement { return errors.New("arrow/ipc: unsupported dictionary replacement in IPC file") } } return err } func (f *FileReader) readFooter() error { if f.footer.offset <= minimumOffsetSize { return fmt.Errorf("arrow/ipc: file too small (size=%d)", f.footer.offset) } eof := int64(len(Magic) + footerSizeLen) buf, err := f.r.getBytes(f.footer.offset-eof, eof) if err != nil { return err } if !bytes.Equal(buf[4:], Magic) { return errNotArrowFile } size := int64(binary.LittleEndian.Uint32(buf[:footerSizeLen])) if size <= 0 || size+minimumOffsetSize > f.footer.offset { return errInconsistentFileMetadata } buf, err = f.r.getBytes(f.footer.offset-size-eof, size) if err != nil { return err } f.footer.buffer = memory.NewBufferBytes(buf) f.footer.data = flatbuf.GetRootAsFooter(buf, 0) return nil } func (f *FileReader) Schema() *arrow.Schema { return f.schema } func (f *FileReader) NumDictionaries() int { if f.footer.data == nil { return 0 } return f.footer.data.DictionariesLength() } func (f *FileReader) NumRecords() int { return f.footer.data.RecordBatchesLength() } func (f *FileReader) Version() MetadataVersion { return MetadataVersion(f.footer.data.Version()) } // Close cleans up resources used by the File. // Close does not close the underlying reader. func (f *FileReader) Close() error { if f.footer.data != nil { f.footer.data = nil } if f.footer.buffer != nil { f.footer.buffer.Release() f.footer.buffer = nil } if f.record != nil { f.record.Release() f.record = nil } return nil } // Record returns the i-th record from the file. // The returned value is valid until the next call to Record. // Users need to call Retain on that Record to keep it valid for longer. func (f *FileReader) Record(i int) (arrow.Record, error) { record, err := f.RecordAt(i) if err != nil { return nil, err } if f.record != nil { f.record.Release() } f.record = record return record, nil } // Record returns the i-th record from the file. Ownership is transferred to the // caller and must call Release() to free the memory. This method is safe to // call concurrently. func (f *FileReader) RecordAt(i int) (arrow.Record, error) { if i < 0 || i > f.NumRecords() { panic("arrow/ipc: record index out of bounds") } blk, err := f.r.block(f.mem, &f.footer, i) if err != nil { return nil, err } switch { case !bitutil.IsMultipleOf8(blk.Offset()): return nil, fmt.Errorf("arrow/ipc: invalid file offset=%d for record %d", blk.Offset(), i) case !bitutil.IsMultipleOf8(int64(blk.Meta())): return nil, fmt.Errorf("arrow/ipc: invalid file metadata=%d position for record %d", blk.Meta(), i) case !bitutil.IsMultipleOf8(blk.Body()): return nil, fmt.Errorf("arrow/ipc: invalid file body=%d position for record %d", blk.Body(), i) } msg, err := blk.NewMessage() if err != nil { return nil, err } defer msg.Release() if msg.Type() != MessageRecordBatch { return nil, fmt.Errorf("arrow/ipc: message %d is not a Record", i) } return newRecord(f.schema, &f.memo, msg.meta, msg.body, f.swapEndianness, f.mem), nil } // Read reads the current record from the underlying stream and an error, if any. // When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF). // // The returned record value is valid until the next call to Read. // Users need to call Retain on that Record to keep it valid for longer. func (f *FileReader) Read() (rec arrow.Record, err error) { if f.irec == f.NumRecords() { return nil, io.EOF } rec, f.err = f.Record(f.irec) f.irec++ return rec, f.err } // ReadAt reads the i-th record from the underlying stream and an error, if any. func (f *FileReader) ReadAt(i int64) (arrow.Record, error) { return f.Record(int(i)) } func newRecord(schema *arrow.Schema, memo *dictutils.Memo, meta *memory.Buffer, body *memory.Buffer, swapEndianness bool, mem memory.Allocator) arrow.Record { var ( msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0) md flatbuf.RecordBatch codec decompressor ) initFB(&md, msg.Header) rows := md.Length() bodyCompress := md.Compression(nil) if bodyCompress != nil { codec = getDecompressor(bodyCompress.Codec()) defer codec.Close() } ctx := &arrayLoaderContext{ src: ipcSource{ meta: &md, rawBytes: body, codec: codec, mem: mem, }, memo: memo, max: kMaxNestingDepth, version: MetadataVersion(msg.Version()), } pos := dictutils.NewFieldPos() cols := make([]arrow.Array, schema.NumFields()) for i := 0; i < schema.NumFields(); i++ { data := ctx.loadArray(schema.Field(i).Type) defer data.Release() if err := dictutils.ResolveFieldDict(memo, data, pos.Child(int32(i)), mem); err != nil { panic(err) } if swapEndianness { swapEndianArrayData(data.(*array.Data)) } cols[i] = array.MakeFromData(data) defer cols[i].Release() } return array.NewRecord(schema, cols, rows) } type ipcSource struct { meta *flatbuf.RecordBatch rawBytes *memory.Buffer codec decompressor mem memory.Allocator } func (src *ipcSource) buffer(i int) *memory.Buffer { var buf flatbuf.Buffer if !src.meta.Buffers(&buf, i) { panic("arrow/ipc: buffer index out of bound") } if buf.Length() == 0 { return memory.NewBufferBytes(nil) } var raw *memory.Buffer if src.codec == nil { raw = memory.SliceBuffer(src.rawBytes, int(buf.Offset()), int(buf.Length())) } else { body := src.rawBytes.Bytes()[buf.Offset() : buf.Offset()+buf.Length()] uncompressedSize := int64(binary.LittleEndian.Uint64(body[:8])) // check for an uncompressed buffer if uncompressedSize != -1 { raw = memory.NewResizableBuffer(src.mem) raw.Resize(int(uncompressedSize)) src.codec.Reset(bytes.NewReader(body[8:])) if _, err := io.ReadFull(src.codec, raw.Bytes()); err != nil { panic(err) } } else { raw = memory.SliceBuffer(src.rawBytes, int(buf.Offset())+8, int(buf.Length())-8) } } return raw } func (src *ipcSource) fieldMetadata(i int) *flatbuf.FieldNode { var node flatbuf.FieldNode if !src.meta.Nodes(&node, i) { panic("arrow/ipc: field metadata out of bound") } return &node } func (src *ipcSource) variadicCount(i int) int64 { return src.meta.VariadicBufferCounts(i) } type arrayLoaderContext struct { src ipcSource ifield int ibuffer int ivariadic int max int memo *dictutils.Memo version MetadataVersion } func (ctx *arrayLoaderContext) field() *flatbuf.FieldNode { field := ctx.src.fieldMetadata(ctx.ifield) ctx.ifield++ return field } func (ctx *arrayLoaderContext) buffer() *memory.Buffer { buf := ctx.src.buffer(ctx.ibuffer) ctx.ibuffer++ return buf } func (ctx *arrayLoaderContext) variadic() int64 { v := ctx.src.variadicCount(ctx.ivariadic) ctx.ivariadic++ return v } func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) arrow.ArrayData { switch dt := dt.(type) { case *arrow.NullType: return ctx.loadNull() case *arrow.DictionaryType: indices := ctx.loadPrimitive(dt.IndexType) defer indices.Release() return array.NewData(dt, indices.Len(), indices.Buffers(), indices.Children(), indices.NullN(), indices.Offset()) case *arrow.BooleanType, *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type, *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type, *arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type, arrow.DecimalType, *arrow.Time32Type, *arrow.Time64Type, *arrow.TimestampType, *arrow.Date32Type, *arrow.Date64Type, *arrow.MonthIntervalType, *arrow.DayTimeIntervalType, *arrow.MonthDayNanoIntervalType, *arrow.DurationType: return ctx.loadPrimitive(dt) case *arrow.BinaryType, *arrow.StringType, *arrow.LargeStringType, *arrow.LargeBinaryType: return ctx.loadBinary(dt) case arrow.BinaryViewDataType: return ctx.loadBinaryView(dt) case *arrow.FixedSizeBinaryType: return ctx.loadFixedSizeBinary(dt) case *arrow.ListType: return ctx.loadList(dt) case *arrow.LargeListType: return ctx.loadList(dt) case *arrow.ListViewType: return ctx.loadListView(dt) case *arrow.LargeListViewType: return ctx.loadListView(dt) case *arrow.FixedSizeListType: return ctx.loadFixedSizeList(dt) case *arrow.StructType: return ctx.loadStruct(dt) case *arrow.MapType: return ctx.loadMap(dt) case arrow.ExtensionType: storage := ctx.loadArray(dt.StorageType()) defer storage.Release() return array.NewData(dt, storage.Len(), storage.Buffers(), storage.Children(), storage.NullN(), storage.Offset()) case *arrow.RunEndEncodedType: field, buffers := ctx.loadCommon(dt.ID(), 1) defer memory.ReleaseBuffers(buffers) runEnds := ctx.loadChild(dt.RunEnds()) defer runEnds.Release() values := ctx.loadChild(dt.Encoded()) defer values.Release() return array.NewData(dt, int(field.Length()), buffers, []arrow.ArrayData{runEnds, values}, int(field.NullCount()), 0) case arrow.UnionType: return ctx.loadUnion(dt) default: panic(fmt.Errorf("arrow/ipc: array type %T not handled yet", dt)) } } func (ctx *arrayLoaderContext) loadCommon(typ arrow.Type, nbufs int) (*flatbuf.FieldNode, []*memory.Buffer) { buffers := make([]*memory.Buffer, 0, nbufs) field := ctx.field() var buf *memory.Buffer if internal.HasValidityBitmap(typ, flatbuf.MetadataVersion(ctx.version)) { switch field.NullCount() { case 0: ctx.ibuffer++ default: buf = ctx.buffer() } } buffers = append(buffers, buf) return field, buffers } func (ctx *arrayLoaderContext) loadChild(dt arrow.DataType) arrow.ArrayData { if ctx.max == 0 { panic("arrow/ipc: nested type limit reached") } ctx.max-- sub := ctx.loadArray(dt) ctx.max++ return sub } func (ctx *arrayLoaderContext) loadNull() arrow.ArrayData { field := ctx.field() return array.NewData(arrow.Null, int(field.Length()), nil, nil, int(field.NullCount()), 0) } func (ctx *arrayLoaderContext) loadPrimitive(dt arrow.DataType) arrow.ArrayData { field, buffers := ctx.loadCommon(dt.ID(), 2) switch field.Length() { case 0: buffers = append(buffers, nil) ctx.ibuffer++ default: buffers = append(buffers, ctx.buffer()) } defer memory.ReleaseBuffers(buffers) return array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0) } func (ctx *arrayLoaderContext) loadBinary(dt arrow.DataType) arrow.ArrayData { field, buffers := ctx.loadCommon(dt.ID(), 3) buffers = append(buffers, ctx.buffer(), ctx.buffer()) defer memory.ReleaseBuffers(buffers) return array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0) } func (ctx *arrayLoaderContext) loadBinaryView(dt arrow.DataType) arrow.ArrayData { nVariadicBufs := ctx.variadic() field, buffers := ctx.loadCommon(dt.ID(), 2+int(nVariadicBufs)) buffers = append(buffers, ctx.buffer()) for i := 0; i < int(nVariadicBufs); i++ { buffers = append(buffers, ctx.buffer()) } defer memory.ReleaseBuffers(buffers) return array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0) } func (ctx *arrayLoaderContext) loadFixedSizeBinary(dt *arrow.FixedSizeBinaryType) arrow.ArrayData { field, buffers := ctx.loadCommon(dt.ID(), 2) buffers = append(buffers, ctx.buffer()) defer memory.ReleaseBuffers(buffers) return array.NewData(dt, int(field.Length()), buffers, nil, int(field.NullCount()), 0) } func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) arrow.ArrayData { field, buffers := ctx.loadCommon(dt.ID(), 2) buffers = append(buffers, ctx.buffer()) defer memory.ReleaseBuffers(buffers) sub := ctx.loadChild(dt.Elem()) defer sub.Release() return array.NewData(dt, int(field.Length()), buffers, []arrow.ArrayData{sub}, int(field.NullCount()), 0) } func (ctx *arrayLoaderContext) loadList(dt arrow.ListLikeType) arrow.ArrayData { field, buffers := ctx.loadCommon(dt.ID(), 2) buffers = append(buffers, ctx.buffer()) defer memory.ReleaseBuffers(buffers) sub := ctx.loadChild(dt.Elem()) defer sub.Release() return array.NewData(dt, int(field.Length()), buffers, []arrow.ArrayData{sub}, int(field.NullCount()), 0) } func (ctx *arrayLoaderContext) loadListView(dt arrow.VarLenListLikeType) arrow.ArrayData { field, buffers := ctx.loadCommon(dt.ID(), 3) buffers = append(buffers, ctx.buffer(), ctx.buffer()) defer memory.ReleaseBuffers(buffers) sub := ctx.loadChild(dt.Elem()) defer sub.Release() return array.NewData(dt, int(field.Length()), buffers, []arrow.ArrayData{sub}, int(field.NullCount()), 0) } func (ctx *arrayLoaderContext) loadFixedSizeList(dt *arrow.FixedSizeListType) arrow.ArrayData { field, buffers := ctx.loadCommon(dt.ID(), 1) defer memory.ReleaseBuffers(buffers) sub := ctx.loadChild(dt.Elem()) defer sub.Release() return array.NewData(dt, int(field.Length()), buffers, []arrow.ArrayData{sub}, int(field.NullCount()), 0) } func (ctx *arrayLoaderContext) loadStruct(dt *arrow.StructType) arrow.ArrayData { field, buffers := ctx.loadCommon(dt.ID(), 1) defer memory.ReleaseBuffers(buffers) subs := make([]arrow.ArrayData, dt.NumFields()) for i, f := range dt.Fields() { subs[i] = ctx.loadChild(f.Type) } defer func() { for i := range subs { subs[i].Release() } }() return array.NewData(dt, int(field.Length()), buffers, subs, int(field.NullCount()), 0) } func (ctx *arrayLoaderContext) loadUnion(dt arrow.UnionType) arrow.ArrayData { // Sparse unions have 2 buffers (a nil validity bitmap, and the type ids) nBuffers := 2 // Dense unions have a third buffer, the offsets if dt.Mode() == arrow.DenseMode { nBuffers = 3 } field, buffers := ctx.loadCommon(dt.ID(), nBuffers) if field.NullCount() != 0 && buffers[0] != nil { panic("arrow/ipc: cannot read pre-1.0.0 union array with top-level validity bitmap") } switch field.Length() { case 0: buffers = append(buffers, memory.NewBufferBytes([]byte{})) ctx.ibuffer++ if dt.Mode() == arrow.DenseMode { buffers = append(buffers, nil) ctx.ibuffer++ } default: buffers = append(buffers, ctx.buffer()) if dt.Mode() == arrow.DenseMode { buffers = append(buffers, ctx.buffer()) } } defer memory.ReleaseBuffers(buffers) subs := make([]arrow.ArrayData, dt.NumFields()) for i, f := range dt.Fields() { subs[i] = ctx.loadChild(f.Type) } defer func() { for i := range subs { subs[i].Release() } }() return array.NewData(dt, int(field.Length()), buffers, subs, 0, 0) } func readDictionary(memo *dictutils.Memo, meta *memory.Buffer, body *memory.Buffer, swapEndianness bool, mem memory.Allocator) (dictutils.Kind, error) { var ( msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0) md flatbuf.DictionaryBatch data flatbuf.RecordBatch codec decompressor ) initFB(&md, msg.Header) md.Data(&data) bodyCompress := data.Compression(nil) if bodyCompress != nil { codec = getDecompressor(bodyCompress.Codec()) defer codec.Close() } id := md.Id() // look up the dictionary value type, which must have been added to the // memo already before calling this function valueType, ok := memo.Type(id) if !ok { return 0, fmt.Errorf("arrow/ipc: no dictionary type found with id: %d", id) } ctx := &arrayLoaderContext{ src: ipcSource{ meta: &data, codec: codec, rawBytes: body, mem: mem, }, memo: memo, max: kMaxNestingDepth, } dict := ctx.loadArray(valueType) defer dict.Release() if swapEndianness { swapEndianArrayData(dict.(*array.Data)) } if md.IsDelta() { memo.AddDelta(id, dict) return dictutils.KindDelta, nil } if memo.AddOrReplace(id, dict) { return dictutils.KindNew, nil } return dictutils.KindReplacement, nil } type mappedFileBlock struct { offset int64 meta int32 body int64 data []byte } func (blk mappedFileBlock) Offset() int64 { return blk.offset } func (blk mappedFileBlock) Meta() int32 { return blk.meta } func (blk mappedFileBlock) Body() int64 { return blk.body } func (blk mappedFileBlock) section() []byte { return blk.data[blk.offset : blk.offset+int64(blk.meta)+blk.body] } func (blk mappedFileBlock) NewMessage() (*Message, error) { var ( body *memory.Buffer meta *memory.Buffer buf = blk.section() ) metaBytes := buf[:blk.meta] prefix := 0 switch binary.LittleEndian.Uint32(metaBytes) { case 0: case kIPCContToken: prefix = 8 default: // ARROW-6314: backwards compatibility for reading old IPC // messages produced prior to version 0.15.0 prefix = 4 } meta = memory.NewBufferBytes(metaBytes[prefix:]) body = memory.NewBufferBytes(buf[blk.meta : int64(blk.meta)+blk.body]) return NewMessage(meta, body), nil }