parquet/internal/encoding/byte_stream_split.go (287 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package encoding
import (
"fmt"
"math"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/internal/debug"
"golang.org/x/xerrors"
)
// encodeByteStreamSplit encodes the raw bytes provided by 'in' into the output buffer 'data' using BYTE_STREAM_SPLIT encoding.
// 'data' must have space for at least len(in) bytes.
func encodeByteStreamSplit(data []byte, in []byte, width int) {
debug.Assert(len(data) >= len(in), fmt.Sprintf("not enough space in destination buffer for encoding, dest: %d bytes, src: %d bytes", len(data), len(in)))
numElements := len(in) / width
for stream := 0; stream < width; stream++ {
for element := 0; element < numElements; element++ {
encLoc := numElements*stream + element
decLoc := width*element + stream
data[encLoc] = in[decLoc]
}
}
}
// encodeByteStreamSplitWidth2 implements encodeByteStreamSplit optimized for types stored using 2 bytes.
// 'data' must have space for at least len(in) bytes.
func encodeByteStreamSplitWidth2(data []byte, in []byte) {
debug.Assert(len(data) >= len(in), fmt.Sprintf("not enough space in destination buffer for encoding, dest: %d bytes, src: %d bytes", len(data), len(in)))
const width = 2
numElements := len(in) / width
for element := 0; element < numElements; element++ {
decLoc := width * element
data[element] = in[decLoc]
data[numElements+element] = in[decLoc+1]
}
}
// encodeByteStreamSplitWidth4 implements encodeByteStreamSplit optimized for types stored using 4 bytes.
// 'data' must have space for at least len(in) bytes.
func encodeByteStreamSplitWidth4(data []byte, in []byte) {
debug.Assert(len(data) >= len(in), fmt.Sprintf("not enough space in destination buffer for encoding, dest: %d bytes, src: %d bytes", len(data), len(in)))
const width = 4
numElements := len(in) / width
for element := 0; element < numElements; element++ {
decLoc := width * element
data[element] = in[decLoc]
data[numElements+element] = in[decLoc+1]
data[numElements*2+element] = in[decLoc+2]
data[numElements*3+element] = in[decLoc+3]
}
}
// encodeByteStreamSplitWidth8 implements encodeByteStreamSplit optimized for types stored using 8 bytes.
// 'data' must have space for at least len(in) bytes.
func encodeByteStreamSplitWidth8(data []byte, in []byte) {
debug.Assert(len(data) >= len(in), fmt.Sprintf("not enough space in destination buffer for encoding, dest: %d bytes, src: %d bytes", len(data), len(in)))
const width = 8
numElements := len(in) / width
for element := 0; element < numElements; element++ {
decLoc := width * element
data[element] = in[decLoc]
data[numElements+element] = in[decLoc+1]
data[numElements*2+element] = in[decLoc+2]
data[numElements*3+element] = in[decLoc+3]
data[numElements*4+element] = in[decLoc+4]
data[numElements*5+element] = in[decLoc+5]
data[numElements*6+element] = in[decLoc+6]
data[numElements*7+element] = in[decLoc+7]
}
}
// decodeByteStreamSplitBatchWidth4 decodes the batch of nValues raw bytes representing a 4-byte datatype provided by 'data',
// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least len(data) bytes.
func decodeByteStreamSplitBatchWidth4(data []byte, nValues, stride int, out []byte) {
const width = 4
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
for element := 0; element < nValues; element++ {
out[width*element] = data[element]
out[width*element+1] = data[stride+element]
out[width*element+2] = data[2*stride+element]
out[width*element+3] = data[3*stride+element]
}
}
// decodeByteStreamSplitBatchWidth8 decodes the batch of nValues raw bytes representing a 8-byte datatype provided by 'data',
// into the output buffer 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least len(data) bytes.
func decodeByteStreamSplitBatchWidth8(data []byte, nValues, stride int, out []byte) {
const width = 8
debug.Assert(len(out) >= nValues*width, fmt.Sprintf("not enough space in output buffer for decoding, out: %d bytes, data: %d bytes", len(out), len(data)))
for element := 0; element < nValues; element++ {
out[width*element] = data[element]
out[width*element+1] = data[stride+element]
out[width*element+2] = data[2*stride+element]
out[width*element+3] = data[3*stride+element]
out[width*element+4] = data[4*stride+element]
out[width*element+5] = data[5*stride+element]
out[width*element+6] = data[6*stride+element]
out[width*element+7] = data[7*stride+element]
}
}
// decodeByteStreamSplitBatchFLBA decodes the batch of nValues FixedLenByteArrays provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
func decodeByteStreamSplitBatchFLBA(data []byte, nValues, stride, width int, out []parquet.FixedLenByteArray) {
debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues))
for stream := 0; stream < width; stream++ {
for element := 0; element < nValues; element++ {
encLoc := stride*stream + element
out[element][stream] = data[encLoc]
}
}
}
// decodeByteStreamSplitBatchFLBAWidth2 decodes the batch of nValues FixedLenByteArrays of length 2 provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
func decodeByteStreamSplitBatchFLBAWidth2(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) {
debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues))
for element := 0; element < nValues; element++ {
out[element][0] = data[element]
out[element][1] = data[stride+element]
}
}
// decodeByteStreamSplitBatchFLBAWidth4 decodes the batch of nValues FixedLenByteArrays of length 4 provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
func decodeByteStreamSplitBatchFLBAWidth4(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) {
debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues))
for element := 0; element < nValues; element++ {
out[element][0] = data[element]
out[element][1] = data[stride+element]
out[element][2] = data[stride*2+element]
out[element][3] = data[stride*3+element]
}
}
// decodeByteStreamSplitBatchFLBAWidth8 decodes the batch of nValues FixedLenByteArrays of length 8 provided by 'data',
// into the output slice 'out' using BYTE_STREAM_SPLIT encoding.
// 'out' must have space for at least nValues slices.
func decodeByteStreamSplitBatchFLBAWidth8(data []byte, nValues, stride int, out []parquet.FixedLenByteArray) {
debug.Assert(len(out) >= nValues, fmt.Sprintf("not enough space in output slice for decoding, out: %d values, data: %d values", len(out), nValues))
for element := 0; element < nValues; element++ {
out[element][0] = data[element]
out[element][1] = data[stride+element]
out[element][2] = data[stride*2+element]
out[element][3] = data[stride*3+element]
out[element][4] = data[stride*4+element]
out[element][5] = data[stride*5+element]
out[element][6] = data[stride*6+element]
out[element][7] = data[stride*7+element]
}
}
func releaseBufferToPool(pooled *PooledBufferWriter) {
buf := pooled.buf
memory.Set(buf.Buf(), 0)
buf.ResizeNoShrink(0)
bufferPool.Put(buf)
}
func validateByteStreamSplitPageData(typeLen, nvals int, data []byte) (int, error) {
if nvals*typeLen < len(data) {
return 0, fmt.Errorf("data size (%d) is too small for the number of values in in BYTE_STREAM_SPLIT (%d)", len(data), nvals)
}
if len(data)%typeLen != 0 {
return 0, fmt.Errorf("ByteStreamSplit data size %d not aligned with byte_width: %d", len(data), typeLen)
}
return len(data) / typeLen, nil
}
// ByteStreamSplitFloat32Encoder writes the underlying bytes of the Float32
// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding
type ByteStreamSplitFloat32Encoder struct {
PlainFloat32Encoder
flushBuffer *PooledBufferWriter
}
func (enc *ByteStreamSplitFloat32Encoder) FlushValues() (Buffer, error) {
in, err := enc.PlainFloat32Encoder.FlushValues()
if err != nil {
return nil, err
}
if enc.flushBuffer == nil {
enc.flushBuffer = NewPooledBufferWriter(in.Len())
}
enc.flushBuffer.buf.Resize(in.Len())
encodeByteStreamSplitWidth4(enc.flushBuffer.Bytes(), in.Bytes())
return enc.flushBuffer.Finish(), nil
}
func (enc *ByteStreamSplitFloat32Encoder) Release() {
enc.PlainFloat32Encoder.Release()
releaseBufferToPool(enc.flushBuffer)
enc.flushBuffer = nil
}
// ByteStreamSplitFloat64Encoder writes the underlying bytes of the Float64
// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding
type ByteStreamSplitFloat64Encoder struct {
PlainFloat64Encoder
flushBuffer *PooledBufferWriter
}
func (enc *ByteStreamSplitFloat64Encoder) FlushValues() (Buffer, error) {
in, err := enc.PlainFloat64Encoder.FlushValues()
if err != nil {
return nil, err
}
if enc.flushBuffer == nil {
enc.flushBuffer = NewPooledBufferWriter(in.Len())
}
enc.flushBuffer.buf.Resize(in.Len())
encodeByteStreamSplitWidth8(enc.flushBuffer.Bytes(), in.Bytes())
return enc.flushBuffer.Finish(), nil
}
func (enc *ByteStreamSplitFloat64Encoder) Release() {
enc.PlainFloat64Encoder.Release()
releaseBufferToPool(enc.flushBuffer)
enc.flushBuffer = nil
}
// ByteStreamSplitInt32Encoder writes the underlying bytes of the Int32
// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding
type ByteStreamSplitInt32Encoder struct {
PlainInt32Encoder
flushBuffer *PooledBufferWriter
}
func (enc *ByteStreamSplitInt32Encoder) FlushValues() (Buffer, error) {
in, err := enc.PlainInt32Encoder.FlushValues()
if err != nil {
return nil, err
}
if enc.flushBuffer == nil {
enc.flushBuffer = NewPooledBufferWriter(in.Len())
}
enc.flushBuffer.buf.Resize(in.Len())
encodeByteStreamSplitWidth4(enc.flushBuffer.Bytes(), in.Bytes())
return enc.flushBuffer.Finish(), nil
}
func (enc *ByteStreamSplitInt32Encoder) Release() {
enc.PlainInt32Encoder.Release()
releaseBufferToPool(enc.flushBuffer)
enc.flushBuffer = nil
}
// ByteStreamSplitInt64Encoder writes the underlying bytes of the Int64
// into interlaced streams as defined by the BYTE_STREAM_SPLIT encoding
type ByteStreamSplitInt64Encoder struct {
PlainInt64Encoder
flushBuffer *PooledBufferWriter
}
func (enc *ByteStreamSplitInt64Encoder) FlushValues() (Buffer, error) {
in, err := enc.PlainInt64Encoder.FlushValues()
if err != nil {
return nil, err
}
if enc.flushBuffer == nil {
enc.flushBuffer = NewPooledBufferWriter(in.Len())
}
enc.flushBuffer.buf.Resize(in.Len())
encodeByteStreamSplitWidth8(enc.flushBuffer.Bytes(), in.Bytes())
return enc.flushBuffer.Finish(), nil
}
func (enc *ByteStreamSplitInt64Encoder) Release() {
enc.PlainInt64Encoder.Release()
releaseBufferToPool(enc.flushBuffer)
enc.flushBuffer = nil
}
// ByteStreamSplitFloat32Decoder is a decoder for BYTE_STREAM_SPLIT-encoded
// bytes representing Float32 values
type ByteStreamSplitFloat32Decoder = ByteStreamSplitDecoder[float32]
// ByteStreamSplitFloat64Decoder is a decoder for BYTE_STREAM_SPLIT-encoded
// bytes representing Float64 values
type ByteStreamSplitFloat64Decoder = ByteStreamSplitDecoder[float64]
// ByteStreamSplitInt32Decoder is a decoder for BYTE_STREAM_SPLIT-encoded
// bytes representing Int32 values
type ByteStreamSplitInt32Decoder = ByteStreamSplitDecoder[int32]
// ByteStreamSplitInt64Decoder is a decoder for BYTE_STREAM_SPLIT-encoded
// bytes representing Int64 values
type ByteStreamSplitInt64Decoder = ByteStreamSplitDecoder[int64]
type ByteStreamSplitDecoder[T float32 | float64 | int32 | int64] struct {
decoder
stride int
}
func (dec *ByteStreamSplitDecoder[T]) Type() parquet.Type {
switch v := any(dec).(type) {
case *ByteStreamSplitDecoder[float32]:
return parquet.Types.Float
case *ByteStreamSplitDecoder[float64]:
return parquet.Types.Double
case *ByteStreamSplitDecoder[int32]:
return parquet.Types.Int32
case *ByteStreamSplitDecoder[int64]:
return parquet.Types.Int64
default:
panic(fmt.Sprintf("ByteStreamSplitDecoder is not supported for type: %T", v))
}
}
func (dec *ByteStreamSplitDecoder[T]) SetData(nvals int, data []byte) error {
nvals, err := validateByteStreamSplitPageData(dec.Type().ByteSize(), nvals, data)
if err != nil {
return err
}
dec.stride = nvals
return dec.decoder.SetData(nvals, data)
}
func (dec *ByteStreamSplitDecoder[T]) Discard(n int) (int, error) {
n = min(n, dec.nvals)
dec.nvals -= n
dec.data = dec.data[n:]
return n, nil
}
func (dec *ByteStreamSplitDecoder[T]) Decode(out []T) (int, error) {
typeLen := dec.Type().ByteSize()
toRead := min(len(out), dec.nvals)
numBytesNeeded := toRead * typeLen
if numBytesNeeded > len(dec.data) || numBytesNeeded > math.MaxInt32 {
return 0, xerrors.New("parquet: eof exception")
}
outBytes := arrow.GetBytes(out)
switch typeLen {
case 4:
decodeByteStreamSplitBatchWidth4(dec.data, toRead, dec.stride, outBytes)
case 8:
decodeByteStreamSplitBatchWidth8(dec.data, toRead, dec.stride, outBytes)
default:
return 0, fmt.Errorf("encoding ByteStreamSplit is only defined for numeric type of width 4 or 8, found: %d", typeLen)
}
dec.nvals -= toRead
dec.data = dec.data[toRead:]
return toRead, nil
}
func (dec *ByteStreamSplitDecoder[T]) DecodeSpaced(out []T, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
toRead := len(out) - nullCount
valuesRead, err := dec.Decode(out[:toRead])
if err != nil {
return valuesRead, err
}
if valuesRead != toRead {
return valuesRead, xerrors.New("parquet: number of values / definitions levels read did not match")
}
return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
}