parquet/metadata/page_index.go (720 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metadata
import (
"fmt"
"io"
"math"
"sync"
"github.com/apache/arrow-go/v18/arrow"
shared_utils "github.com/apache/arrow-go/v18/internal/utils"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/internal/debug"
"github.com/apache/arrow-go/v18/parquet/internal/encoding"
"github.com/apache/arrow-go/v18/parquet/internal/encryption"
format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
"github.com/apache/arrow-go/v18/parquet/internal/thrift"
"github.com/apache/arrow-go/v18/parquet/internal/utils"
"github.com/apache/arrow-go/v18/parquet/schema"
)
// BoundaryOrder identifies whether the min and max values are ordered and
// if so, which direction it is stored in.
type BoundaryOrder = format.BoundaryOrder
const (
Unordered BoundaryOrder = format.BoundaryOrder_UNORDERED
Ascending BoundaryOrder = format.BoundaryOrder_ASCENDING
Descending BoundaryOrder = format.BoundaryOrder_DESCENDING
)
// ColumnIndex is an interface for reading optional statistics for
// each data page of a column chunk. Along with the OffsetIndex this
// forms the PageIndex for a column chunk.
type ColumnIndex interface {
// GetNullPages returns a list of bools to determine the validity of the
// corresponding min/max values. If the value is true, then that page
// contains only null values.
GetNullPages() []bool
// IsSetNullCounts returns true if the null counts are set.
IsSetNullCounts() bool
// GetNullCounts returns the number of null values in each page. This is
// only valid if IsSetNullCounts returns true.
GetNullCounts() []int64
// GetBoundaryOrder returns if the min/max values are ordered and if so,
// what direction.
GetBoundaryOrder() BoundaryOrder
// GetMinValues returns the encoded minimum value for each page
GetMinValues() [][]byte
// GetMaxValues returns the encoded max value for each page
GetMaxValues() [][]byte
GetRepetitionLevelHistograms() []int64
GetDefinitionLevelHistograms() []int64
}
// TypedColumnIndex expands the ColumnIndex interface to provide a
// type-safe accessor for the min/max values.
type TypedColumnIndex[T parquet.ColumnTypes] struct {
ColumnIndex
minvals []T
maxvals []T
nonNullPageIndices []int32
}
type typedDecoder[T parquet.ColumnTypes] interface {
encoding.TypedDecoder
Decode([]T) (int, error)
DecodeSpaced([]T, int, []byte, int64) (int, error)
}
func must(err error) {
if err != nil {
panic(err)
}
}
func mustArg[T any](val T, err error) T {
must(err)
return val
}
// NewColumnIndex uses the thrift serialized bytes to deserialize a column index, optionally decrypting it.
//
// The column descriptor is used to determine the physical type of the column to create a proper
// TypedColumnIndex.
func NewColumnIndex(descr *schema.Column, serializedIndex []byte, props *parquet.ReaderProperties, decryptor encryption.Decryptor) ColumnIndex {
if decryptor != nil {
serializedIndex = decryptor.Decrypt(serializedIndex)
}
var colidx format.ColumnIndex
if _, err := thrift.DeserializeThrift(&colidx, serializedIndex); err != nil {
panic(err)
}
switch descr.PhysicalType() {
case parquet.Types.Boolean:
return newTypedColumnIndex[bool](descr, &colidx)
case parquet.Types.Int32:
return newTypedColumnIndex[int32](descr, &colidx)
case parquet.Types.Int64:
return newTypedColumnIndex[int64](descr, &colidx)
case parquet.Types.Int96:
return newTypedColumnIndex[parquet.Int96](descr, &colidx)
case parquet.Types.Float:
return newTypedColumnIndex[float32](descr, &colidx)
case parquet.Types.Double:
return newTypedColumnIndex[float64](descr, &colidx)
case parquet.Types.ByteArray:
return newTypedColumnIndex[parquet.ByteArray](descr, &colidx)
case parquet.Types.FixedLenByteArray:
return newTypedColumnIndex[parquet.FixedLenByteArray](descr, &colidx)
}
panic("unreachable: cannot make columnindex of unknown type")
}
func getDecoder[T parquet.ColumnTypes](descr *schema.Column) func([]byte) T {
switch descr.PhysicalType() {
case parquet.Types.ByteArray:
var f any = func(data []byte) parquet.ByteArray {
return parquet.ByteArray(data)
}
return f.(func([]byte) T)
case parquet.Types.FixedLenByteArray:
var f any = func(data []byte) parquet.FixedLenByteArray {
return parquet.FixedLenByteArray(data)
}
return f.(func([]byte) T)
default:
decoder := encoding.NewDecoder(descr.PhysicalType(), parquet.Encodings.Plain, descr, nil).(typedDecoder[T])
var buf [1]T
return func(data []byte) T {
must(decoder.SetData(1, data))
mustArg(decoder.Decode(buf[:]))
return buf[0]
}
}
}
func newTypedColumnIndex[T parquet.ColumnTypes](descr *schema.Column, colIdx *format.ColumnIndex) *TypedColumnIndex[T] {
numPages := len(colIdx.NullPages)
if numPages >= math.MaxInt32 ||
len(colIdx.MinValues) != numPages ||
len(colIdx.MaxValues) != numPages ||
(colIdx.IsSetNullCounts() && len(colIdx.NullCounts) != numPages) {
panic("invalid column index")
}
numNonNullPages := 0
for _, page := range colIdx.NullPages {
if !page {
numNonNullPages++
}
}
debug.Assert(numNonNullPages <= numPages, "invalid column index")
minvals, maxvals := make([]T, numPages), make([]T, numPages)
nonNullPageIndices := make([]int32, 0, numNonNullPages)
dec := getDecoder[T](descr)
for i := 0; i < numPages; i++ {
if !colIdx.NullPages[i] {
nonNullPageIndices = append(nonNullPageIndices, int32(i))
minvals[i] = dec(colIdx.MinValues[i])
maxvals[i] = dec(colIdx.MaxValues[i])
}
}
debug.Assert(len(nonNullPageIndices) == numNonNullPages, "invalid column index")
return &TypedColumnIndex[T]{
ColumnIndex: colIdx,
minvals: minvals,
maxvals: maxvals,
nonNullPageIndices: nonNullPageIndices,
}
}
func (idx *TypedColumnIndex[T]) MinValues() []T {
return idx.minvals
}
func (idx *TypedColumnIndex[T]) MaxValues() []T {
return idx.maxvals
}
func (idx *TypedColumnIndex[T]) NonNullPageIndices() []int32 {
return idx.nonNullPageIndices
}
type (
// PageLocation describes where in a file a particular page can be found,
// along with the index within the rowgroup of the first row in the page
PageLocation = format.PageLocation
PageIndexSelection struct {
// specifies whether to read the column index
ColumnIndex bool
// specifies whether to read the offset index
OffsetIndex bool
}
)
// OffsetIndex forms the page index alongside a ColumnIndex,
// the OffsetIndex may be present even if a ColumnIndex is not.
type OffsetIndex interface {
GetPageLocations() []*PageLocation
GetUnencodedByteArrayDataBytes() []int64
}
func (p PageIndexSelection) String() string {
return fmt.Sprintf("PageIndexSelection{column_index = %t, offset_index = %t}",
p.ColumnIndex, p.OffsetIndex)
}
// NewOffsetIndex constructs an OffsetIndex object from the thrift serialized bytes,
// optionally decrypting it if it was encrypted.
func NewOffsetIndex(serializedIndex []byte, _ *parquet.ReaderProperties, decryptor encryption.Decryptor) OffsetIndex {
if decryptor != nil {
serializedIndex = decryptor.Decrypt(serializedIndex)
}
var offsetIndex format.OffsetIndex
if _, err := thrift.DeserializeThrift(&offsetIndex, serializedIndex); err != nil {
panic(err)
}
return &offsetIndex
}
type readRange struct {
Offset, Length int64
}
func (r readRange) Contains(other readRange) bool {
return r.Offset <= other.Offset && other.Offset+other.Length <= r.Offset+r.Length
}
func checkReadRange(loc IndexLocation, idxRange *readRange, rgOrdinal int32) error {
if idxRange == nil {
return fmt.Errorf("%w: missing page index read range of row group %d, it may not exist or has not been requested",
arrow.ErrInvalid, rgOrdinal)
}
// coalesced read range is invalid
if idxRange.Offset < 0 || idxRange.Length <= 0 {
return fmt.Errorf("%w: invalid page index read range: offset %d, length %d",
arrow.ErrInvalid, idxRange.Offset, idxRange.Length)
}
if loc.Offset < 0 || loc.Length <= 0 {
return fmt.Errorf("%w: invalid page index location: offset %d, length %d",
arrow.ErrInvalid, loc.Offset, loc.Length)
}
if loc.Offset < idxRange.Offset || loc.Offset+int64(loc.Length) > idxRange.Offset+idxRange.Length {
return fmt.Errorf("%w: Page index location [offset:%d,length:%d] is out of range from previous WillNeed request [offset:%d,length:%d], row group: %d",
arrow.ErrInvalid, loc.Offset, loc.Length, idxRange.Offset, idxRange.Length, rgOrdinal)
}
return nil
}
type rgIndexReadRange struct {
ColIndex, OffsetIndex *readRange
}
// RowGroupPageIndexReader is a read-only object for retrieving column and offset
// indexes for a given row group.
type RowGroupPageIndexReader struct {
input parquet.ReaderAtSeeker
rowGroupMetadata *RowGroupMetaData
props *parquet.ReaderProperties
rgOrdinal int32
idxReadRange rgIndexReadRange
fileDecryptor encryption.FileDecryptor
// buffers to hold raw bytes of page index
// will be lazily set when the corresponding page index is accessed
colIndexBuffer, offsetIndexBuffer []byte
// cache of column indexes
colIndexes map[int]ColumnIndex
// cache of offset indices
offsetIndices map[int]OffsetIndex
mx sync.Mutex
}
func (r *RowGroupPageIndexReader) GetColumnIndex(i int) (ColumnIndex, error) {
if i < 0 || i >= r.rowGroupMetadata.NumColumns() {
return nil, fmt.Errorf("%w: invalid column index at column ordinal %d",
arrow.ErrInvalid, i)
}
r.mx.Lock()
defer r.mx.Unlock()
if r.colIndexes == nil {
r.colIndexes = make(map[int]ColumnIndex)
} else {
if idx, ok := r.colIndexes[i]; ok {
return idx, nil
}
}
colChunk, err := r.rowGroupMetadata.ColumnChunk(i)
if err != nil {
return nil, err
}
colIndexLocation := colChunk.GetColumnIndexLocation()
if colIndexLocation == nil {
return nil, nil
}
if err := checkReadRange(*colIndexLocation, r.idxReadRange.ColIndex, r.rgOrdinal); err != nil {
return nil, err
}
if r.colIndexBuffer == nil {
r.colIndexBuffer = make([]byte, r.idxReadRange.ColIndex.Length)
if _, err := r.input.ReadAt(r.colIndexBuffer, r.idxReadRange.ColIndex.Offset); err != nil {
return nil, err
}
}
bufferOffset := colIndexLocation.Offset - r.idxReadRange.ColIndex.Offset
descr := r.rowGroupMetadata.Schema.Column(i)
decryptor, err := encryption.GetColumnMetaDecryptor(colChunk.CryptoMetadata(), r.fileDecryptor)
if err != nil {
return nil, err
}
if decryptor != nil {
encryption.UpdateDecryptor(decryptor, int16(r.rgOrdinal),
int16(i), encryption.ColumnIndexModule)
}
idx := NewColumnIndex(descr, r.colIndexBuffer[bufferOffset:], r.props, decryptor)
r.colIndexes[i] = idx
return idx, nil
}
func (r *RowGroupPageIndexReader) GetOffsetIndex(i int) (OffsetIndex, error) {
if i < 0 || i >= r.rowGroupMetadata.NumColumns() {
return nil, fmt.Errorf("%w: invalid column index at column ordinal %d",
arrow.ErrInvalid, i)
}
r.mx.Lock()
defer r.mx.Unlock()
if r.offsetIndices == nil {
r.offsetIndices = make(map[int]OffsetIndex)
} else {
if idx, ok := r.offsetIndices[i]; ok {
return idx, nil
}
}
colChunk, err := r.rowGroupMetadata.ColumnChunk(i)
if err != nil {
return nil, err
}
offsetIndexLocation := colChunk.GetOffsetIndexLocation()
if offsetIndexLocation == nil {
return nil, nil
}
if err := checkReadRange(*offsetIndexLocation, r.idxReadRange.OffsetIndex, r.rgOrdinal); err != nil {
return nil, err
}
if r.offsetIndexBuffer == nil {
r.offsetIndexBuffer = make([]byte, r.idxReadRange.OffsetIndex.Length)
if _, err := r.input.ReadAt(r.offsetIndexBuffer, r.idxReadRange.OffsetIndex.Offset); err != nil {
return nil, err
}
}
bufferOffset := offsetIndexLocation.Offset - r.idxReadRange.OffsetIndex.Offset
decryptor, err := encryption.GetColumnMetaDecryptor(colChunk.CryptoMetadata(), r.fileDecryptor)
if err != nil {
return nil, err
}
if decryptor != nil {
encryption.UpdateDecryptor(decryptor, int16(r.rgOrdinal),
int16(i), encryption.OffsetIndexModule)
}
oidx := NewOffsetIndex(r.offsetIndexBuffer[bufferOffset:], r.props, decryptor)
r.offsetIndices[i] = oidx
return oidx, nil
}
// PageIndexReader is a read-only object for retrieving the Column and Offset indexes
// for a particular parquet file.
type PageIndexReader struct {
Input parquet.ReaderAtSeeker
FileMetadata *FileMetaData
Props *parquet.ReaderProperties
Decryptor encryption.FileDecryptor
// coalesced read ranges of page index of row groups that have
// been suggested by WillNeed(). key is the row group ordinal
idxReadRanges map[int32]rgIndexReadRange
}
func determinePageIndexRangesInRowGroup(rgMeta *RowGroupMetaData, cols []int32) (rng rgIndexReadRange, err error) {
ciStart, oiStart := int64(math.MaxInt64), int64(math.MaxInt64)
ciEnd, oiEnd := int64(-1), int64(-1)
mergeRange := func(idxLocation *IndexLocation, start, end *int64) error {
if idxLocation == nil {
return nil
}
indexEnd, ok := shared_utils.Add(idxLocation.Offset, int64(idxLocation.Length))
if idxLocation.Offset < 0 || idxLocation.Length <= 0 || !ok {
return fmt.Errorf("%w: invalid page index location: offset %d length %d",
arrow.ErrIndex, idxLocation.Offset, idxLocation.Length)
}
*start = min(*start, idxLocation.Offset)
*end = max(*end, indexEnd)
return nil
}
var colChunk *ColumnChunkMetaData
if len(cols) == 0 {
cols = make([]int32, rgMeta.NumColumns())
for i := 0; i < rgMeta.NumColumns(); i++ {
cols[i] = int32(i)
}
}
for _, i := range cols {
if i < 0 || i >= int32(rgMeta.NumColumns()) {
return rng, fmt.Errorf("%w: invalid column ordinal %d", arrow.ErrIndex, i)
}
if colChunk, _ = rgMeta.ColumnChunk(int(i)); colChunk == nil {
continue
}
if err = mergeRange(colChunk.GetColumnIndexLocation(), &ciStart, &ciEnd); err != nil {
return
}
if err = mergeRange(colChunk.GetOffsetIndexLocation(), &oiStart, &oiEnd); err != nil {
return
}
}
if ciEnd != -1 {
rng.ColIndex = &readRange{Offset: ciStart, Length: ciEnd - ciStart}
}
if oiEnd != -1 {
rng.OffsetIndex = &readRange{Offset: oiStart, Length: oiEnd - oiStart}
}
return
}
func (r *PageIndexReader) RowGroup(i int) (*RowGroupPageIndexReader, error) {
if i < 0 || i >= r.FileMetadata.NumRowGroups() {
return nil, fmt.Errorf("%w: invalid row group ordinal %d", arrow.ErrInvalid, i)
}
var err error
rgmeta := r.FileMetadata.RowGroup(i)
idxReadRange, ok := r.idxReadRanges[int32(i)]
if !ok {
// row group has not been requested by WillNeed(), by default both
// column index and offset index of all column chunks for the row group
// can be read.
if idxReadRange, err = determinePageIndexRangesInRowGroup(rgmeta, nil); err != nil {
return nil, err
}
}
if idxReadRange.ColIndex != nil || idxReadRange.OffsetIndex != nil {
return &RowGroupPageIndexReader{
input: r.Input,
rowGroupMetadata: rgmeta,
props: r.Props,
rgOrdinal: int32(i),
idxReadRange: idxReadRange,
fileDecryptor: r.Decryptor,
}, nil
}
// the row group does not have a page index or has not been requested by willneed
// simply return a nil pointer
return nil, nil
}
func (r *PageIndexReader) WillNeed(rgIndices, colIndices []int32, selection PageIndexSelection) error {
if r.idxReadRanges == nil {
r.idxReadRanges = make(map[int32]rgIndexReadRange)
}
for _, ordinal := range rgIndices {
readRange, err := determinePageIndexRangesInRowGroup(r.FileMetadata.RowGroup(int(ordinal)), colIndices)
if err != nil {
return err
}
if !selection.ColumnIndex || readRange.ColIndex == nil {
// mark column index as not requested
readRange.ColIndex = nil
}
if !selection.OffsetIndex || readRange.OffsetIndex == nil {
// mark offset index as not requested
readRange.OffsetIndex = nil
}
r.idxReadRanges[int32(ordinal)] = readRange
}
// TODO: possibly use read ranges to prefetch data of the input
return nil
}
func (r *PageIndexReader) WillNotNeed(rgIndices []int32) {
if r.idxReadRanges == nil {
return
}
for _, i := range rgIndices {
delete(r.idxReadRanges, i)
}
}
type builderState int8
const (
stateCreated builderState = iota
stateStarted
stateFinished
stateDiscarded
)
// ColumnIndexBuilder is an interface for constructing column indexes,
// with the concrete implementations being fully typed.
type ColumnIndexBuilder interface {
AddPage(stats *EncodedStatistics) error
Finish() error
WriteTo(w io.Writer, encryptor encryption.Encryptor) (int, error)
Build() ColumnIndex
}
type columnIndexBuilder[T parquet.ColumnTypes] struct {
descr *schema.Column
colIndex format.ColumnIndex
nonNullPageIndices []int64
state builderState
}
// NewColumnIndexBuilder creates a new typed ColumnIndexBuilder for the given column descriptor.
func NewColumnIndexBuilder(descr *schema.Column) ColumnIndexBuilder {
switch descr.PhysicalType() {
case parquet.Types.Boolean:
return newColumnIndexBuilder[bool](descr)
case parquet.Types.Int32:
return newColumnIndexBuilder[int32](descr)
case parquet.Types.Int64:
return newColumnIndexBuilder[int64](descr)
case parquet.Types.Int96:
return newColumnIndexBuilder[parquet.Int96](descr)
case parquet.Types.Float:
return newColumnIndexBuilder[float32](descr)
case parquet.Types.Double:
return newColumnIndexBuilder[float64](descr)
case parquet.Types.ByteArray:
return newColumnIndexBuilder[parquet.ByteArray](descr)
case parquet.Types.FixedLenByteArray:
return newColumnIndexBuilder[parquet.FixedLenByteArray](descr)
case parquet.Types.Undefined:
return nil
}
panic("unreachable: cannot make column index builder of unknown type")
}
func newColumnIndexBuilder[T parquet.ColumnTypes](descr *schema.Column) *columnIndexBuilder[T] {
return &columnIndexBuilder[T]{
descr: descr,
colIndex: format.ColumnIndex{
NullCounts: make([]int64, 0),
BoundaryOrder: Unordered,
},
nonNullPageIndices: make([]int64, 0),
state: stateCreated,
}
}
func (b *columnIndexBuilder[T]) AddPage(stats *EncodedStatistics) error {
if b.state == stateFinished {
return fmt.Errorf("%w: cannot add page to finished ColumnIndexBuilder", arrow.ErrInvalid)
} else if b.state == stateDiscarded {
return nil
}
b.state = stateStarted
switch {
case stats.AllNullValue:
b.colIndex.NullPages = append(b.colIndex.NullPages, true)
// thrift deserializes nil byte slice or empty byte slice both as
// an empty byte slice. So we should append an empty byte slice
// instead of nil so that round trip comparisons are consistent.
b.colIndex.MinValues = append(b.colIndex.MinValues, []byte{})
b.colIndex.MaxValues = append(b.colIndex.MaxValues, []byte{})
case stats.HasMin && stats.HasMax:
pageOrdinal := len(b.colIndex.NullPages)
b.nonNullPageIndices = append(b.nonNullPageIndices, int64(pageOrdinal))
b.colIndex.MinValues = append(b.colIndex.MinValues, stats.Min)
b.colIndex.MaxValues = append(b.colIndex.MaxValues, stats.Max)
b.colIndex.NullPages = append(b.colIndex.NullPages, false)
default:
// this is a non-null page but it lacks meaningful min/max values
// discard the column index
b.state = stateDiscarded
return nil
}
if b.colIndex.IsSetNullCounts() && stats.HasNullCount {
b.colIndex.NullCounts = append(b.colIndex.NullCounts, stats.NullCount)
} else {
b.colIndex.NullCounts = nil
}
return nil
}
func (b *columnIndexBuilder[T]) Finish() error {
switch b.state {
case stateCreated:
// no page added, discard the column index
b.state = stateDiscarded
return nil
case stateFinished:
return fmt.Errorf("%w: ColumnIndexBuilder is already finished", arrow.ErrInvalid)
case stateDiscarded:
// column index is discarded, do nothing
return nil
case stateStarted:
}
b.state = stateFinished
// clear null counts vector because at least one page does not provide it
if !b.colIndex.IsSetNullCounts() {
b.colIndex.NullCounts = nil
}
// decode min/max values according to data type
nonNullPageCnt := len(b.nonNullPageIndices)
minVals, maxVals := make([]T, nonNullPageCnt), make([]T, nonNullPageCnt)
dec := getDecoder[T](b.descr)
for i, pageOrdinal := range b.nonNullPageIndices {
minVals[i] = dec(b.colIndex.MinValues[pageOrdinal])
maxVals[i] = dec(b.colIndex.MaxValues[pageOrdinal])
}
// decode the boundary order from decoded min/max vals
b.colIndex.BoundaryOrder = b.determineBoundaryOrder(minVals, maxVals)
return nil
}
func (b *columnIndexBuilder[T]) Build() ColumnIndex {
if b.state != stateFinished {
return nil
}
return newTypedColumnIndex[T](b.descr, &b.colIndex)
}
func (b *columnIndexBuilder[T]) WriteTo(w io.Writer, encryptor encryption.Encryptor) (int, error) {
if b.state == stateFinished {
return thrift.NewThriftSerializer().Serialize(&b.colIndex, w, encryptor)
}
return 0, nil
}
func (b *columnIndexBuilder[T]) determineBoundaryOrder(minVals, maxVals []T) BoundaryOrder {
debug.Assert(len(minVals) == len(maxVals), "min/max values length mismatch")
if len(minVals) == 0 {
return Unordered
}
comp, err := NewTypedComparator[T](b.descr)
if err != nil {
return Unordered
}
// check if both minVals and maxVals are in ascending order
isAsc := true
for i := 1; i < len(minVals); i++ {
if comp.Compare(minVals[i], minVals[i-1]) ||
comp.Compare(maxVals[i], maxVals[i-1]) {
isAsc = false
break
}
}
if isAsc {
return Ascending
}
// check if both minVals and maxVals are in descending order
isDesc := true
for i := 1; i < len(minVals); i++ {
if comp.Compare(minVals[i-1], minVals[i]) ||
comp.Compare(maxVals[i-1], maxVals[i]) {
isDesc = false
break
}
}
if isDesc {
return Descending
}
return Unordered
}
// OffsetIndexBuilder provides a way to construct new OffsetIndexes while writing
// a parquet file.
type OffsetIndexBuilder struct {
offsetIndex format.OffsetIndex
state builderState
}
func (o *OffsetIndexBuilder) AddPageLoc(pgloc PageLocation) error {
return o.AddPage(pgloc.Offset, pgloc.FirstRowIndex, pgloc.CompressedPageSize)
}
func (o *OffsetIndexBuilder) AddPage(offset, firstRowIdx int64, compressedPgSize int32) error {
if o.state == stateFinished {
return fmt.Errorf("%w: cannot add page to finished OffsetIndexBuilder", arrow.ErrInvalid)
} else if o.state == stateDiscarded {
// offset index is discarded, do nothing
return nil
}
o.state = stateStarted
o.offsetIndex.PageLocations = append(o.offsetIndex.PageLocations, &PageLocation{
Offset: offset,
FirstRowIndex: firstRowIdx,
CompressedPageSize: compressedPgSize,
})
return nil
}
func (o *OffsetIndexBuilder) Finish(finalPos int64) error {
switch o.state {
case stateCreated:
o.state = stateDiscarded
case stateStarted:
// adjust page offsets according to final position
if finalPos > 0 {
for _, loc := range o.offsetIndex.PageLocations {
loc.Offset += finalPos
}
}
o.state = stateFinished
case stateFinished, stateDiscarded:
return fmt.Errorf("%w: OffsetIndexBuilder is already finished", arrow.ErrInvalid)
}
return nil
}
func (o *OffsetIndexBuilder) WriteTo(w io.Writer, encryptor encryption.Encryptor) (int, error) {
if o.state == stateFinished {
return thrift.NewThriftSerializer().Serialize(&o.offsetIndex, w, encryptor)
}
return 0, nil
}
func (o *OffsetIndexBuilder) Build() OffsetIndex {
if o.state != stateFinished {
return nil
}
return &o.offsetIndex
}
// PageIndexBuilder manages the creation of the entire PageIndex for a parquet file,
// managing the builders for each row group as they are added and providing getters
// to retrieve the particular builders for specific columns and row groups.
type PageIndexBuilder struct {
Schema *schema.Schema
Encryptor encryption.FileEncryptor
colIndexBuilders [][]ColumnIndexBuilder
offsetIndexBuilders [][]*OffsetIndexBuilder
finished bool
}
func (b *PageIndexBuilder) AppendRowGroup() error {
if b.finished {
return fmt.Errorf("%w: cannot append row group to finished PageIndexBuilder", arrow.ErrInvalid)
}
if b.Schema == nil {
return fmt.Errorf("%w: schema is not set in PageIndexBuilder", arrow.ErrInvalid)
}
numColumns := b.Schema.NumColumns()
b.colIndexBuilders = append(b.colIndexBuilders, make([]ColumnIndexBuilder, numColumns))
b.offsetIndexBuilders = append(b.offsetIndexBuilders, make([]*OffsetIndexBuilder, numColumns))
debug.Assert(len(b.colIndexBuilders) == len(b.offsetIndexBuilders), "column and offset index builders mismatch")
return nil
}
func (b *PageIndexBuilder) GetColumnIndexBuilder(i int) (ColumnIndexBuilder, error) {
if err := b.checkState(i); err != nil {
return nil, err
}
bldr := &b.colIndexBuilders[len(b.colIndexBuilders)-1][i]
if *bldr == nil {
*bldr = NewColumnIndexBuilder(b.Schema.Column(i))
}
return *bldr, nil
}
func (b *PageIndexBuilder) GetOffsetIndexBuilder(i int) (*OffsetIndexBuilder, error) {
if err := b.checkState(i); err != nil {
return nil, err
}
bldr := &b.offsetIndexBuilders[len(b.offsetIndexBuilders)-1][i]
if *bldr == nil {
*bldr = &OffsetIndexBuilder{}
}
return *bldr, nil
}
func (b *PageIndexBuilder) Finish() { b.finished = true }
func (b *PageIndexBuilder) WriteTo(w utils.WriterTell, location *PageIndexLocation) error {
if !b.finished {
return fmt.Errorf("%w: PageIndexBuilder is not finished", arrow.ErrInvalid)
}
location.ColIndexLocation = make(map[uint64][]*IndexLocation)
location.OffsetIndexLocation = make(map[uint64][]*IndexLocation)
// serialize column index
if err := serializeIndex(b.Schema, b.colIndexBuilders, w, location.ColIndexLocation, encryption.ColumnIndexModule, b.getColumnMetaEncryptor); err != nil {
return err
}
// serialize offset index
if err := serializeIndex(b.Schema, b.offsetIndexBuilders, w, location.OffsetIndexLocation, encryption.OffsetIndexModule, b.getColumnMetaEncryptor); err != nil {
return err
}
return nil
}
func (b *PageIndexBuilder) checkState(col int) error {
if b.finished {
return fmt.Errorf("%w: cannot add page to finished PageIndexBuilder", arrow.ErrInvalid)
}
if col < 0 || col >= b.Schema.NumColumns() {
return fmt.Errorf("%w: invalid column ordinal %d", arrow.ErrInvalid, col)
}
if len(b.colIndexBuilders) == 0 || len(b.offsetIndexBuilders) == 0 {
return fmt.Errorf("%w: No row group appended to PageIndexBuilder", arrow.ErrInvalid)
}
return nil
}
func (b *PageIndexBuilder) getColumnMetaEncryptor(rgOrdinal, colOrdinal int, moduleType int8) encryption.Encryptor {
if b.Encryptor == nil {
return nil
}
colPath := b.Schema.Column(colOrdinal).Path()
encryptor := b.Encryptor.GetColumnMetaEncryptor(colPath)
if encryptor != nil {
encryptor.UpdateAad(encryption.CreateModuleAad(
encryptor.FileAad(), moduleType, int16(rgOrdinal),
int16(colOrdinal), encryption.NonPageOrdinal))
}
return encryptor
}
func serializeIndex[T interface {
comparable
WriteTo(io.Writer, encryption.Encryptor) (int, error)
}](s *schema.Schema, bldrs [][]T, w utils.WriterTell, location map[uint64][]*IndexLocation, moduleType int8, encFn func(int, int, int8) encryption.Encryptor) error {
var (
z T
numCols = s.NumColumns()
)
// serialize the same kind of page index, row group by row group
for rg, idxBldrs := range bldrs {
debug.Assert(len(idxBldrs) == numCols, "column index builders length mismatch")
hasValidIndex := false
locations := make([]*IndexLocation, numCols)
// in the same row group, serialize the same kind of page index column by column
for col, bldr := range idxBldrs {
if bldr == z {
continue
}
encryptor := encFn(rg, col, moduleType)
posBefore := w.Tell()
n, err := bldr.WriteTo(w, encryptor)
if err != nil {
return err
}
if n == 0 {
continue
}
if n > math.MaxInt32 {
return fmt.Errorf("%w: serialized page index size overflows INT32_MAX", arrow.ErrInvalid)
}
locations[col] = &IndexLocation{Offset: posBefore, Length: int32(n)}
hasValidIndex = true
}
if hasValidIndex {
location[uint64(rg)] = locations
}
}
return nil
}