velox/dwio/parquet/reader/ParquetReader.cpp (261 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/dwio/parquet/reader/ParquetReader.h"
#include "velox/duckdb/conversion/DuckConversion.h"
#include "velox/duckdb/conversion/DuckWrapper.h"
#include "velox/dwio/parquet/reader/Statistics.h"
namespace facebook::velox::parquet {
namespace {
::duckdb::Value makeValue(::duckdb::LogicalType type, int64_t val) {
switch (type.id()) {
case ::duckdb::LogicalTypeId::INTEGER:
return ::duckdb::Value::INTEGER(val);
case ::duckdb::LogicalTypeId::BIGINT:
return ::duckdb::Value::BIGINT(val);
case ::duckdb::LogicalTypeId::DATE:
return ::duckdb::Value::DATE(::duckdb::date_t(val));
default:
VELOX_UNSUPPORTED(
"Unsupported column type for integer filter: {}", type.ToString());
}
}
std::unique_ptr<::duckdb::ConstantFilter> constantFilter(
::duckdb::ExpressionType expressionType,
::duckdb::Value value) {
return std::make_unique<::duckdb::ConstantFilter>(
expressionType, std::move(value));
}
std::unique_ptr<::duckdb::ConstantFilter> constantEqualFilter(
::duckdb::Value value) {
return std::make_unique<::duckdb::ConstantFilter>(
::duckdb::ExpressionType::COMPARE_EQUAL, std::move(value));
}
void toDuckDbFilter(
uint64_t colIdx,
::duckdb::LogicalType type,
common::Filter* filter,
::duckdb::TableFilterSet& filters) {
switch (filter->kind()) {
case common::FilterKind::kBigintRange: {
auto rangeFilter = static_cast<common::BigintRange*>(filter);
if (rangeFilter->isSingleValue()) {
filters.PushFilter(
colIdx, constantEqualFilter(makeValue(type, rangeFilter->lower())));
} else {
if (rangeFilter->lower() != std::numeric_limits<int64_t>::min()) {
filters.PushFilter(
colIdx,
constantFilter(
::duckdb::ExpressionType::COMPARE_GREATERTHANOREQUALTO,
makeValue(type, rangeFilter->lower())));
}
if (rangeFilter->upper() != std::numeric_limits<int64_t>::max()) {
filters.PushFilter(
colIdx,
constantFilter(
::duckdb::ExpressionType::COMPARE_LESSTHANOREQUALTO,
makeValue(type, rangeFilter->upper())));
}
}
break;
}
case common::FilterKind::kDoubleRange: {
auto rangeFilter = static_cast<common::DoubleRange*>(filter);
if (!rangeFilter->lowerUnbounded()) {
auto expressionType = rangeFilter->lowerExclusive()
? ::duckdb::ExpressionType::COMPARE_GREATERTHAN
: ::duckdb::ExpressionType::COMPARE_GREATERTHANOREQUALTO;
filters.PushFilter(
colIdx,
constantFilter(
expressionType, ::duckdb::Value(rangeFilter->lower())));
}
if (!rangeFilter->upperUnbounded()) {
auto expressionType = rangeFilter->upperExclusive()
? ::duckdb::ExpressionType::COMPARE_LESSTHAN
: ::duckdb::ExpressionType::COMPARE_LESSTHANOREQUALTO;
filters.PushFilter(
colIdx,
constantFilter(
expressionType, ::duckdb::Value(rangeFilter->upper())));
}
break;
}
case common::FilterKind::kBytesValues: {
auto valuesFilter = static_cast<common::BytesValues*>(filter);
const auto& values = valuesFilter->values();
if (values.size() == 1) {
filters.PushFilter(colIdx, constantEqualFilter(*values.begin()));
} else {
auto duckFilter = std::make_unique<::duckdb::ConjunctionOrFilter>();
for (const auto& value : values) {
duckFilter->child_filters.push_back(constantEqualFilter(value));
}
filters.PushFilter(colIdx, std::move(duckFilter));
}
break;
}
case common::FilterKind::kBytesRange: {
auto rangeFilter = static_cast<common::BytesRange*>(filter);
if (!rangeFilter->lowerUnbounded()) {
auto expressionType = rangeFilter->lowerExclusive()
? ::duckdb::ExpressionType::COMPARE_GREATERTHAN
: ::duckdb::ExpressionType::COMPARE_GREATERTHANOREQUALTO;
filters.PushFilter(
colIdx,
constantFilter(
expressionType, ::duckdb::Value(rangeFilter->lower())));
}
if (!rangeFilter->upperUnbounded()) {
auto expressionType = rangeFilter->upperExclusive()
? ::duckdb::ExpressionType::COMPARE_LESSTHAN
: ::duckdb::ExpressionType::COMPARE_LESSTHANOREQUALTO;
filters.PushFilter(
colIdx,
constantFilter(
expressionType, ::duckdb::Value(rangeFilter->upper())));
}
break;
}
case common::FilterKind::kAlwaysFalse:
case common::FilterKind::kAlwaysTrue:
case common::FilterKind::kIsNull:
case common::FilterKind::kIsNotNull:
case common::FilterKind::kBoolValue:
case common::FilterKind::kBigintValuesUsingHashTable:
case common::FilterKind::kBigintValuesUsingBitmask:
case common::FilterKind::kFloatRange:
case common::FilterKind::kBigintMultiRange:
case common::FilterKind::kMultiRange:
default:
VELOX_UNSUPPORTED(
"Unsupported filter in parquet reader: {}", filter->toString());
}
}
std::optional<common::Filter*> findFilter(
const std::string& columnName,
const common::ScanSpec* scanSpec) {
const auto& childSpecs = scanSpec->children();
for (const auto& childSpec : childSpecs) {
VELOX_CHECK(
!childSpec->extractValues(),
"Subfield access is NYI in parquet reader");
if (childSpec->fieldName() == columnName && childSpec->filter()) {
return childSpec->filter();
}
}
return std::nullopt;
}
} // anonymous namespace
ParquetRowReader::ParquetRowReader(
std::shared_ptr<::duckdb::ParquetReader> reader,
const dwio::common::RowReaderOptions& options,
memory::MemoryPool& pool)
: reader_(std::move(reader)), pool_(pool) {
auto& selector = *options.getSelector();
rowType_ = selector.buildSelectedReordered();
duckdbRowType_.reserve(rowType_->size());
auto& projection = selector.getProjection();
VELOX_CHECK_EQ(rowType_->size(), projection.size());
std::vector<::duckdb::column_t> columnIds;
columnIds.reserve(rowType_->size());
for (uint64_t i = 0; i < projection.size(); i++) {
uint64_t columnId = projection[i].column;
VELOX_CHECK_LT(
columnId,
reader_->names.size(),
"Unexpected column name: {}",
projection[i].name);
columnIds.push_back(columnId);
// DuckDB ParquetReader::return_types contains all columns present in the
// file.
::duckdb::LogicalType duckDbType = reader_->return_types[columnId];
duckdbRowType_.push_back(duckDbType);
if (options.getScanSpec()) {
auto veloxFilter = findFilter(projection[i].name, options.getScanSpec());
if (veloxFilter) {
toDuckDbFilter(i, duckDbType, veloxFilter.value(), filters_);
}
}
}
std::vector<idx_t> groups;
for (idx_t i = 0; i < reader_->NumRowGroups(); i++) {
auto groupOffset = reader_->GetFileMetadata()->row_groups[i].file_offset;
if (groupOffset >= options.getOffset() &&
groupOffset < (options.getLength() + options.getOffset())) {
groups.push_back(i);
}
}
reader_->InitializeScan(
state_, std::move(columnIds), std::move(groups), &filters_);
}
uint64_t ParquetRowReader::next(uint64_t /*size*/, velox::VectorPtr& result) {
::duckdb::DataChunk output;
output.Initialize(duckdbRowType_);
reader_->Scan(state_, output);
if (output.size() > 0) {
std::vector<VectorPtr> columns;
columns.reserve(output.data.size());
for (int i = 0; i < output.data.size(); i++) {
columns.emplace_back(duckdb::toVeloxVector(
output.size(), output.data[i], rowType_->childAt(i), &pool_));
}
result = std::make_shared<RowVector>(
&pool_,
rowType_,
BufferPtr(nullptr),
output.size(),
columns,
std::nullopt);
}
return output.size();
}
void ParquetRowReader::updateRuntimeStats(
dwio::common::RuntimeStatistics& /*stats*/) const {}
void ParquetRowReader::resetFilterCaches() {
// No filter caches to reset.
}
std::optional<size_t> ParquetRowReader::estimatedRowSize() const {
// TODO Implement.
return std::nullopt;
}
ParquetReader::ParquetReader(
std::unique_ptr<dwio::common::InputStream> stream,
const dwio::common::ReaderOptions& options)
: allocator_(options.getMemoryPool()),
fileSystem_(
std::make_unique<duckdb::InputStreamFileSystem>(std::move(stream))),
reader_(std::make_shared<::duckdb::ParquetReader>(
allocator_,
fileSystem_->OpenFile())),
pool_(options.getMemoryPool()) {
auto names = reader_->names;
std::vector<TypePtr> types;
types.reserve(reader_->return_types.size());
for (auto& t : reader_->return_types) {
types.emplace_back(duckdb::toVeloxType(t));
}
type_ = ROW(std::move(names), std::move(types));
}
std::optional<uint64_t> ParquetReader::numberOfRows() const {
return const_cast<::duckdb::ParquetReader*>(reader_.get())->NumRows();
}
std::unique_ptr<dwio::common::ColumnStatistics> ParquetReader::columnStatistics(
uint32_t /*index*/) const {
// TODO: implement proper stats
return std::make_unique<ColumnStatistics>();
}
const velox::RowTypePtr& ParquetReader::rowType() const {
return type_;
}
const std::shared_ptr<const dwio::common::TypeWithId>&
ParquetReader::typeWithId() const {
if (!typeWithId_) {
typeWithId_ = dwio::common::TypeWithId::create(type_);
}
return typeWithId_;
}
std::unique_ptr<dwio::common::RowReader> ParquetReader::createRowReader(
const dwio::common::RowReaderOptions& options) const {
return std::make_unique<ParquetRowReader>(reader_, options, pool_);
}
void registerParquetReaderFactory() {
dwio::common::registerReaderFactory(std::make_shared<ParquetReaderFactory>());
}
void unregisterParquetReaderFactory() {
dwio::common::unregisterReaderFactory(dwio::common::FileFormat::PARQUET);
}
} // namespace facebook::velox::parquet