pkg/resultset/reader.go (88 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package resultset import ( "context" "errors" "io" "github.com/googlecloudplatform/pi-delivery/pkg/obj" ) // Reader is a reader for a ResultSet, starting at the first figit (offset = 0) // to the end of the ResultSet. Reader automatically switches to the next object // as necessary. Alternatively you can also use ReadAt to read a section of ResultSet. // Must be created by NewReader() and the caller must Close() after use. type Reader struct { set ResultSet bucket obj.Bucket off int64 rd io.ReadCloser seeked bool } // Reader implements both io.ReaderAt and io.ReadSeekCloser var _ io.ReadSeekCloser = new(Reader) var _ io.ReaderAt = new(Reader) func readOnce(set ResultSet, bucket obj.Bucket, p []byte, off int64) (int, error) { reader, err := newRangeReader(context.Background(), set, bucket, off, int64(len(p))) if err != nil { return 0, err } defer reader.Close() return io.ReadFull(reader, p) } // ReadAt reads len(p) bytes of packed digits starting at byte result offset // (first byte in the result set is 0). // Returns io.EOF at the end of the result set. func (r *Reader) ReadAt(p []byte, off int64) (int, error) { n := 0 for n < len(p) { read, err := readOnce(r.set, r.bucket, p[n:], off+int64(n)) n += read if err == io.ErrUnexpectedEOF { continue } if err != nil { return n, err } } return n, nil } // Read reads len(p) bytes of packed digits at the current position. // Read returns at the end of each block with error == nil. // Callers should continue to call Read() if it needs more digits. func (r *Reader) Read(p []byte) (int, error) { if r.rd == nil || r.seeked { if err := r.Close(); err != nil { return 0, err } reader, err := newRangeReader(context.Background(), r.set, r.bucket, r.off, -1) r.rd = reader r.seeked = false if err != nil { return 0, err } } n, err := r.rd.Read(p) r.off += int64(n) if err == io.EOF { // Next Read() call needs to recreate the reader. r.seeked = true // Ignore EOF because there might be more data. return n, nil } return n, err } // Seek sets the byte result offset for the next Read(). func (r *Reader) Seek(offset int64, whence int) (int64, error) { off := r.off switch whence { case io.SeekStart: off = offset case io.SeekCurrent: off += offset case io.SeekEnd: off = r.set.TotalByteLength() + offset } if off < 0 { return r.off, errors.New("Seek: negative offset") } if r.off != off { r.off = off r.seeked = true } return off, nil } // Close closes the Reader. func (r *Reader) Close() error { if r.rd != nil { err := r.rd.Close() r.rd = nil return err } return nil } // ResultSet returns the underlying ResultSet. func (r *Reader) ResultSet() ResultSet { return r.set }