func()

in parquet/file/page_reader.go [674:839]


func (p *serializedPageReader) Next() bool {
	// Loop here because there may be unhandled page types that we skip until
	// finding a page that we do know what to do with
	if p.curPage != nil {
		p.curPage.Release()
	}
	p.curPage = nil
	p.curPageHdr = format.NewPageHeader()
	p.err = nil

	for p.rowsSeen < p.nrows {
		p.decompressBuffer.Reset()
		if err := p.readPageHeader(p.r, p.curPageHdr); err != nil {
			if err != io.EOF {
				p.err = err
			}

			return false
		}

		lenCompressed := int(p.curPageHdr.GetCompressedPageSize())
		lenUncompressed := int(p.curPageHdr.GetUncompressedPageSize())
		if lenCompressed < 0 || lenUncompressed < 0 {
			p.err = errors.New("parquet: invalid page header")
			return false
		}

		if p.cryptoCtx.DataDecryptor != nil {
			p.updateDecryption(p.cryptoCtx.DataDecryptor, encryption.DictPageModule, p.dataPageAad)
		}

		buf := memory.NewResizableBuffer(p.mem)
		defer buf.Release()
		buf.ResizeNoShrink(lenUncompressed)

		switch p.curPageHdr.GetType() {
		case format.PageType_DICTIONARY_PAGE:
			p.cryptoCtx.StartDecryptWithDictionaryPage = false
			dictHeader := p.curPageHdr.GetDictionaryPageHeader()
			if dictHeader.GetNumValues() < 0 {
				p.err = xerrors.New("parquet: invalid page header (negative number of values)")
				return false
			}

			data, err := p.decompress(p.r, lenCompressed, buf.Bytes())
			if err != nil {
				p.err = err
				return false
			}
			if len(data) != lenUncompressed {
				p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed dictionary page, got %d bytes", lenUncompressed, len(data))
				return false
			}

			// make dictionary page
			p.curPage = &DictionaryPage{
				page: page{
					buf:      memory.NewBufferBytes(data),
					typ:      p.curPageHdr.Type,
					nvals:    dictHeader.GetNumValues(),
					encoding: dictHeader.GetEncoding(),
				},
				sorted: dictHeader.IsSetIsSorted() && dictHeader.GetIsSorted(),
			}

		case format.PageType_DATA_PAGE:
			p.pageOrd++
			dataHeader := p.curPageHdr.GetDataPageHeader()
			if dataHeader.GetNumValues() < 0 {
				p.err = xerrors.New("parquet: invalid page header (negative number of values)")
				return false
			}

			firstRowIdx := p.rowsSeen
			p.rowsSeen += int64(dataHeader.GetNumValues())
			data, err := p.decompress(p.r, lenCompressed, buf.Bytes())
			if err != nil {
				p.err = err
				return false
			}
			if len(data) != lenUncompressed {
				p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, len(data))
				return false
			}

			// make datapagev1
			p.curPage = &DataPageV1{
				page: page{
					buf:      memory.NewBufferBytes(data),
					typ:      p.curPageHdr.Type,
					nvals:    dataHeader.GetNumValues(),
					encoding: dataHeader.GetEncoding(),
				},
				defLvlEncoding:   dataHeader.GetDefinitionLevelEncoding(),
				repLvlEncoding:   dataHeader.GetRepetitionLevelEncoding(),
				uncompressedSize: int32(lenUncompressed),
				statistics:       extractStats(dataHeader),
				firstRowIndex:    firstRowIdx,
			}
		case format.PageType_DATA_PAGE_V2:
			p.pageOrd++
			dataHeader := p.curPageHdr.GetDataPageHeaderV2()
			if dataHeader.GetNumValues() < 0 {
				p.err = xerrors.New("parquet: invalid page header (negative number of values)")
				return false
			}

			if dataHeader.GetDefinitionLevelsByteLength() < 0 || dataHeader.GetRepetitionLevelsByteLength() < 0 {
				p.err = xerrors.New("parquet: invalid page header (negative levels byte length)")
				return false
			}

			compressed := dataHeader.GetIsCompressed()
			// extract stats
			firstRowIdx := p.rowsSeen
			p.rowsSeen += int64(dataHeader.GetNumRows())
			levelsBytelen, ok := utils.Add(int(dataHeader.GetDefinitionLevelsByteLength()), int(dataHeader.GetRepetitionLevelsByteLength()))
			if !ok {
				p.err = xerrors.New("parquet: levels size too large (corrupt file?)")
				return false
			}

			if compressed {
				if levelsBytelen > 0 {
					io.ReadFull(p.r, buf.Bytes()[:levelsBytelen])
				}
				if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
					return false
				}
			} else {
				io.ReadFull(p.r, buf.Bytes())
			}
			buf.Retain()

			if buf.Len() != lenUncompressed {
				p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, buf.Len())
				return false
			}

			// make datapage v2
			p.curPage = &DataPageV2{
				page: page{
					buf:      buf,
					typ:      p.curPageHdr.Type,
					nvals:    dataHeader.GetNumValues(),
					encoding: dataHeader.GetEncoding(),
				},
				nulls:            dataHeader.GetNumNulls(),
				nrows:            dataHeader.GetNumRows(),
				defLvlByteLen:    dataHeader.GetDefinitionLevelsByteLength(),
				repLvlByteLen:    dataHeader.GetRepetitionLevelsByteLength(),
				compressed:       compressed,
				uncompressedSize: int32(lenUncompressed),
				statistics:       extractStats(dataHeader),
				firstRowIndex:    firstRowIdx,
			}
		default:
			// we don't know this page type, we're allowed to skip non-data pages
			continue
		}

		return true
	}

	return false
}