arrow/flight/record_batch_reader.go (172 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package flight import ( "bytes" "fmt" "sync/atomic" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/arrio" "github.com/apache/arrow-go/v18/arrow/internal/debug" "github.com/apache/arrow-go/v18/arrow/ipc" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/internal/utils" ) // DataStreamReader is an interface for receiving flight data messages on a stream // such as via grpc with Arrow Flight. type DataStreamReader interface { Recv() (*FlightData, error) } type dataMessageReader struct { rdr DataStreamReader peeked *FlightData refCount atomic.Int64 msg *ipc.Message lastAppMetadata []byte descr *FlightDescriptor } func (d *dataMessageReader) Message() (*ipc.Message, error) { var ( fd *FlightData err error ) if d.peeked != nil { fd = d.peeked d.peeked = nil } else { fd, err = d.rdr.Recv() } if err != nil { if d.msg != nil { // clear the previous message in the error case d.msg.Release() d.msg = nil } d.lastAppMetadata = nil d.descr = nil return nil, err } d.lastAppMetadata = fd.AppMetadata d.descr = fd.FlightDescriptor d.msg = ipc.NewMessage(memory.NewBufferBytes(fd.DataHeader), memory.NewBufferBytes(fd.DataBody)) return d.msg, nil } func (d *dataMessageReader) Retain() { d.refCount.Add(1) } func (d *dataMessageReader) Release() { debug.Assert(d.refCount.Load() > 0, "too many releases") if d.refCount.Add(-1) == 0 { if d.msg != nil { d.msg.Release() d.msg = nil } d.lastAppMetadata = nil } } // Reader is an ipc.Reader which also keeps track of the metadata from // the FlightData messages as they come in, calling LatestAppMetadata // will return the metadata bytes from the most recently read message. type Reader struct { *ipc.Reader dmr *dataMessageReader } // Retain increases the reference count for the underlying message reader // and ipc.Reader which are utilized by this Reader. func (r *Reader) Retain() { r.Reader.Retain() r.dmr.Retain() } // Release reduces the reference count for the underlying message reader // and ipc.Reader, when the reference counts become zero, the allocated // memory is released for the stored record and metadata. func (r *Reader) Release() { r.Reader.Release() r.dmr.Release() } // LatestAppMetadata returns the bytes from the AppMetadata field of the // most recently read FlightData message that was processed by calling // the Next function. The metadata returned would correspond to the record // retrieved by calling Record(). func (r *Reader) LatestAppMetadata() []byte { return r.dmr.lastAppMetadata } // LatestFlightDescriptor returns a pointer to the last FlightDescriptor object // that was received in the most recently read FlightData message that was // processed by calling the Next function. The descriptor returned would correspond // to the record retrieved by calling Record(). func (r *Reader) LatestFlightDescriptor() *FlightDescriptor { return r.dmr.descr } // Chunk is a convenience function to return a chunk of the flight stream // returning the RecordBatch along with the FlightDescriptor and any AppMetadata. // Each of these can be retrieved separately with their respective functions, // this is just a convenience to retrieve all three with one function call. func (r *Reader) Chunk() StreamChunk { return StreamChunk{ Data: r.Record(), Desc: r.dmr.descr, AppMetadata: r.dmr.lastAppMetadata, } } // NewRecordReader constructs an ipc reader using the flight data stream reader // as the source of the ipc messages, opts passed will be passed to the underlying // ipc.Reader such as ipc.WithSchema and ipc.WithAllocator func NewRecordReader(r DataStreamReader, opts ...ipc.Option) (*Reader, error) { // peek the first message for a descriptor data, err := r.Recv() if err != nil { return nil, err } rdr := &Reader{dmr: &dataMessageReader{rdr: r}} rdr.dmr.refCount.Add(1) rdr.dmr.descr = data.FlightDescriptor if len(data.DataHeader) > 0 { rdr.dmr.peeked = data } rdr.dmr.Retain() if rdr.Reader, err = ipc.NewReaderFromMessageReader(rdr.dmr, opts...); err != nil { return nil, fmt.Errorf("arrow/flight: could not create flight reader: %w", err) } return rdr, nil } // DeserializeSchema takes the schema bytes from FlightInfo or SchemaResult // and returns the deserialized arrow schema. func DeserializeSchema(info []byte, mem memory.Allocator) (*arrow.Schema, error) { // even though the Flight proto file says that the bytes should be the // flatbuffer message as per Schema.fbs, the current implementations send // a serialized recordbatch with no body rows rather than just the // schema message. So let's make sure to follow that. rdr, err := ipc.NewReader(bytes.NewReader(info), ipc.WithAllocator(mem)) if err != nil { return nil, err } defer rdr.Release() return rdr.Schema(), nil } // StreamChunk represents a single chunk of a FlightData stream type StreamChunk struct { Data arrow.Record Desc *FlightDescriptor AppMetadata []byte Err error } // MessageReader is an interface representing a RecordReader // that also provides StreamChunks and/or the ability to retrieve // FlightDescriptors and AppMetadata from the flight stream type MessageReader interface { array.RecordReader arrio.Reader Err() error Chunk() StreamChunk LatestFlightDescriptor() *FlightDescriptor LatestAppMetadata() []byte } type haserr interface { Err() error } // StreamChunksFromReader is a convenience function to populate a channel // from a record reader. It is intended to be run using a separate goroutine // by calling `go flight.StreamChunksFromReader(rdr, ch)`. // // If the record reader panics, an error chunk will get sent on the channel. // // This will close the channel and release the reader when it completes. func StreamChunksFromReader(rdr array.RecordReader, ch chan<- StreamChunk) { defer close(ch) defer func() { if err := recover(); err != nil { ch <- StreamChunk{Err: utils.FormatRecoveredError("panic while reading", err)} } }() defer rdr.Release() for rdr.Next() { rec := rdr.Record() rec.Retain() ch <- StreamChunk{Data: rec} } if e, ok := rdr.(haserr); ok { if e.Err() != nil { ch <- StreamChunk{Err: e.Err()} } } } func ConcatenateReaders(rdrs []array.RecordReader, ch chan<- StreamChunk) { defer close(ch) defer func() { for _, r := range rdrs { r.Release() } if err := recover(); err != nil { ch <- StreamChunk{Err: utils.FormatRecoveredError("panic while reading", err)} } }() for _, r := range rdrs { for r.Next() { rec := r.Record() rec.Retain() ch <- StreamChunk{Data: rec} } if e, ok := r.(haserr); ok { if e.Err() != nil { ch <- StreamChunk{Err: e.Err()} return } } } }