in parquet/file/page_reader.go [674:839]
func (p *serializedPageReader) Next() bool {
// Loop here because there may be unhandled page types that we skip until
// finding a page that we do know what to do with
if p.curPage != nil {
p.curPage.Release()
}
p.curPage = nil
p.curPageHdr = format.NewPageHeader()
p.err = nil
for p.rowsSeen < p.nrows {
p.decompressBuffer.Reset()
if err := p.readPageHeader(p.r, p.curPageHdr); err != nil {
if err != io.EOF {
p.err = err
}
return false
}
lenCompressed := int(p.curPageHdr.GetCompressedPageSize())
lenUncompressed := int(p.curPageHdr.GetUncompressedPageSize())
if lenCompressed < 0 || lenUncompressed < 0 {
p.err = errors.New("parquet: invalid page header")
return false
}
if p.cryptoCtx.DataDecryptor != nil {
p.updateDecryption(p.cryptoCtx.DataDecryptor, encryption.DictPageModule, p.dataPageAad)
}
buf := memory.NewResizableBuffer(p.mem)
defer buf.Release()
buf.ResizeNoShrink(lenUncompressed)
switch p.curPageHdr.GetType() {
case format.PageType_DICTIONARY_PAGE:
p.cryptoCtx.StartDecryptWithDictionaryPage = false
dictHeader := p.curPageHdr.GetDictionaryPageHeader()
if dictHeader.GetNumValues() < 0 {
p.err = xerrors.New("parquet: invalid page header (negative number of values)")
return false
}
data, err := p.decompress(p.r, lenCompressed, buf.Bytes())
if err != nil {
p.err = err
return false
}
if len(data) != lenUncompressed {
p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed dictionary page, got %d bytes", lenUncompressed, len(data))
return false
}
// make dictionary page
p.curPage = &DictionaryPage{
page: page{
buf: memory.NewBufferBytes(data),
typ: p.curPageHdr.Type,
nvals: dictHeader.GetNumValues(),
encoding: dictHeader.GetEncoding(),
},
sorted: dictHeader.IsSetIsSorted() && dictHeader.GetIsSorted(),
}
case format.PageType_DATA_PAGE:
p.pageOrd++
dataHeader := p.curPageHdr.GetDataPageHeader()
if dataHeader.GetNumValues() < 0 {
p.err = xerrors.New("parquet: invalid page header (negative number of values)")
return false
}
firstRowIdx := p.rowsSeen
p.rowsSeen += int64(dataHeader.GetNumValues())
data, err := p.decompress(p.r, lenCompressed, buf.Bytes())
if err != nil {
p.err = err
return false
}
if len(data) != lenUncompressed {
p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, len(data))
return false
}
// make datapagev1
p.curPage = &DataPageV1{
page: page{
buf: memory.NewBufferBytes(data),
typ: p.curPageHdr.Type,
nvals: dataHeader.GetNumValues(),
encoding: dataHeader.GetEncoding(),
},
defLvlEncoding: dataHeader.GetDefinitionLevelEncoding(),
repLvlEncoding: dataHeader.GetRepetitionLevelEncoding(),
uncompressedSize: int32(lenUncompressed),
statistics: extractStats(dataHeader),
firstRowIndex: firstRowIdx,
}
case format.PageType_DATA_PAGE_V2:
p.pageOrd++
dataHeader := p.curPageHdr.GetDataPageHeaderV2()
if dataHeader.GetNumValues() < 0 {
p.err = xerrors.New("parquet: invalid page header (negative number of values)")
return false
}
if dataHeader.GetDefinitionLevelsByteLength() < 0 || dataHeader.GetRepetitionLevelsByteLength() < 0 {
p.err = xerrors.New("parquet: invalid page header (negative levels byte length)")
return false
}
compressed := dataHeader.GetIsCompressed()
// extract stats
firstRowIdx := p.rowsSeen
p.rowsSeen += int64(dataHeader.GetNumRows())
levelsBytelen, ok := utils.Add(int(dataHeader.GetDefinitionLevelsByteLength()), int(dataHeader.GetRepetitionLevelsByteLength()))
if !ok {
p.err = xerrors.New("parquet: levels size too large (corrupt file?)")
return false
}
if compressed {
if levelsBytelen > 0 {
io.ReadFull(p.r, buf.Bytes()[:levelsBytelen])
}
if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
return false
}
} else {
io.ReadFull(p.r, buf.Bytes())
}
buf.Retain()
if buf.Len() != lenUncompressed {
p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, buf.Len())
return false
}
// make datapage v2
p.curPage = &DataPageV2{
page: page{
buf: buf,
typ: p.curPageHdr.Type,
nvals: dataHeader.GetNumValues(),
encoding: dataHeader.GetEncoding(),
},
nulls: dataHeader.GetNumNulls(),
nrows: dataHeader.GetNumRows(),
defLvlByteLen: dataHeader.GetDefinitionLevelsByteLength(),
repLvlByteLen: dataHeader.GetRepetitionLevelsByteLength(),
compressed: compressed,
uncompressedSize: int32(lenUncompressed),
statistics: extractStats(dataHeader),
firstRowIndex: firstRowIdx,
}
default:
// we don't know this page type, we're allowed to skip non-data pages
continue
}
return true
}
return false
}