pkg/unpack/reader.go (155 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package unpack
import (
"context"
"errors"
"fmt"
"io"
"github.com/googlecloudplatform/pi-delivery/pkg/resultset"
"github.com/googlecloudplatform/pi-delivery/pkg/ycd"
)
// UpstreamReader is the reader UnpackReader reads from.
type UpstreamReader interface {
io.ReadSeeker
io.ReaderAt
// Result returns the upstream result set.
ResultSet() resultset.ResultSet
}
// UnpackReader reads from the Upstream Reader and converts packed digits
// to unpacked string representation ("14159...").
// Note the first offset is still the first digit after the decimal point as in
// the packed format.
type UnpackReader struct {
radix int
off int64
totalDigits int64
blockSize int64
rd UpstreamReader
seeked bool
unread []byte
}
var _ io.ReadSeeker = new(UnpackReader)
var _ io.ReaderAt = new(UnpackReader)
var ErrNotFullWord = errors.New("read bytes are not full words")
// NewReader returns a new UnpackReader for UpstreamReader rd
func NewReader(ctx context.Context, rd UpstreamReader) *UnpackReader {
return &UnpackReader{
radix: rd.ResultSet().Radix(),
totalDigits: rd.ResultSet().TotalDigits(),
blockSize: rd.ResultSet().BlockSize(),
rd: rd,
}
}
// ReadAt reads len(p) bytes of unpacked digits starting at the off-th digit.
// ReadAt(p, 0) returns 141592... for decimal results.
// Note that YCD files starts at the second digit after the decimal point
// so we'll treat the 0-th digit specifically.
func (r *UnpackReader) ReadAt(p []byte, off int64) (int, error) {
if len(p) == 0 {
return 0, nil
}
if off >= r.totalDigits {
return 0, io.EOF
}
start, n, pre, _ := ToPackedOffsets(off, r.blockSize, int64(len(p)), ycd.DigitsPerWord(r.radix))
packed := make([]byte, n)
read, err := r.rd.ReadAt(packed, start)
if read == 0 {
return 0, err
}
if read%WordSize != 0 {
return 0, fmt.Errorf("read %v bytes: %w", read, ErrNotFullWord)
}
remaining := len(p)
if remaining > int(r.totalDigits-off) {
remaining = int(r.totalDigits - off)
err = io.EOF
}
written, perr := r.unpack(p[:remaining], packed[:read], off, pre)
if perr != nil {
return written, fmt.Errorf("unpack error at off %v: %w", off, perr)
}
return written, err
}
// Read reads len(p) bytes of unpacked digits starting at the current reader offset.
func (r *UnpackReader) Read(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
if r.off >= r.totalDigits {
return 0, io.EOF
}
written := 0
read := 0
dpw := ycd.DigitsPerWord(r.radix)
start, packedN, pre, post := ToPackedOffsets(r.off, r.blockSize, int64(len(p)), dpw)
if r.seeked {
if _, err := r.rd.Seek(start, io.SeekStart); err != nil {
return written, err
}
r.unread = nil
r.seeked = false
}
packed := make([]byte, packedN)
if len(r.unread) > 0 {
read += copy(packed, r.unread)
if post == 0 || packedN > 2*WordSize {
r.unread = nil
}
}
n, err := io.ReadFull(r.rd, packed[read:])
read += n
remaining := len(p)
if remaining > int(r.totalDigits-r.off) {
remaining = int(r.totalDigits - r.off)
}
n, perr := r.unpack(p[:remaining], packed[:read], r.off, pre)
r.off += int64(n)
written += n
if read%WordSize != 0 {
return written, fmt.Errorf("off %v, read bytes %v: %w", r.off, n, ErrNotFullWord)
}
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return written, fmt.Errorf("read error at off %v: %w", r.off, err)
}
if perr != nil {
poff, _ := r.rd.Seek(0, io.SeekCurrent)
return written, fmt.Errorf("unpack error at off %v, packed off %v: %w", r.off, poff, err)
}
if int64(read) == packedN && post > 0 {
r.unread = make([]byte, WordSize)
copy(r.unread, packed[read-WordSize:])
}
if err == io.ErrUnexpectedEOF {
return written, io.EOF
}
return written, err
}
// Seek updates the offset for the next Read.
func (r *UnpackReader) Seek(offset int64, whence int) (int64, error) {
off := r.off
switch whence {
case io.SeekStart:
off = offset
case io.SeekCurrent:
off += offset
case io.SeekEnd:
off = r.totalDigits + offset
}
if off < 0 {
return r.off, errors.New("Seek: negative offset")
}
if r.off != off {
r.off = off
r.seeked = true
}
return off, nil
}
func (r *UnpackReader) unpack(unpacked, packed []byte, offset int64, pre int) (int, error) {
poff := 0
written := 0
dpw := ycd.DigitsPerWord(r.radix)
for poff < len(packed) && written < len(unpacked) {
remaining := len(unpacked) - written
reqDigits := remaining
if offset%r.blockSize+int64(remaining) > r.blockSize {
reqDigits = int(r.blockSize - offset%r.blockSize)
}
reqBytes := (reqDigits + dpw - 1) / dpw * WordSize
n, err := UnpackBlock(unpacked[written:written+reqDigits], packed[poff:poff+reqBytes], r.radix, pre)
poff += reqBytes
written += n
offset += int64(n)
if err != nil {
return written, err
}
pre = 0
}
return written, nil
}