in be/src/exec/parquet/hdfs-parquet-scanner.cc [1839:2003]
Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_group,
bool* row_group_eliminated) {
*row_group_eliminated = false;
// Check if there's anything to do here.
if (dict_filterable_readers_.empty()) return Status::OK();
// Legacy impala files (< 2.9) require special handling, because they do not encode
// information about whether the column is 100% dictionary encoded.
bool is_legacy_impala = false;
if (file_version_.application == "impala" && file_version_.VersionLt(2,9,0)) {
is_legacy_impala = true;
}
// Keeps track of column readers that need to be initialized. For example, if a
// column cannot be filtered, then defer its dictionary initialization once we know
// the row group cannot be filtered.
vector<BaseScalarColumnReader*> deferred_dict_init_list;
// Keeps track of the initialized tuple associated with a TupleDescriptor.
unordered_map<const TupleDescriptor*, Tuple*> tuple_map;
for (BaseScalarColumnReader* scalar_reader : dict_filterable_readers_) {
const parquet::ColumnMetaData& col_metadata =
row_group.columns[scalar_reader->col_idx()].meta_data;
// Legacy impala files cannot be eliminated here, because the only way to
// determine whether the column is 100% dictionary encoded requires reading
// the dictionary.
if (!is_legacy_impala && !IsDictionaryEncoded(col_metadata)) {
// We cannot guarantee that this reader is 100% dictionary encoded,
// so dictionary filters cannot be used. Defer initializing its dictionary
// until after the other filters have been evaluated.
deferred_dict_init_list.push_back(scalar_reader);
continue;
}
RETURN_IF_ERROR(scalar_reader->InitDictionary());
DictDecoderBase* dictionary = scalar_reader->GetDictionaryDecoder();
if (!dictionary) continue;
// Legacy (version < 2.9) Impala files do not spill to PLAIN encoding until
// it reaches the maximum number of dictionary entries. If the dictionary
// has fewer entries, then it is 100% dictionary encoded.
if (is_legacy_impala &&
dictionary->num_entries() >= LEGACY_IMPALA_MAX_DICT_ENTRIES) continue;
const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
DCHECK(slot_desc != nullptr);
const TupleDescriptor* tuple_desc = slot_desc->parent();
auto dict_filter_it = dict_filter_map_.find(slot_desc->id());
const vector<ScalarExprEvaluator*>* dict_filter_conjunct_evals =
dict_filter_it != dict_filter_map_.end()
? &(dict_filter_it->second) : nullptr;
auto runtime_filter_it = single_col_filter_ctxs_.find(slot_desc->id());
const vector<const FilterContext*>* runtime_filters =
runtime_filter_it != single_col_filter_ctxs_.end()
? &runtime_filter_it->second : nullptr;
DCHECK(runtime_filters != nullptr || dict_filter_conjunct_evals != nullptr);
Tuple* dict_filter_tuple = nullptr;
auto dict_filter_tuple_it = tuple_map.find(tuple_desc);
if (dict_filter_tuple_it == tuple_map.end()) {
auto tuple_it = dict_filter_tuple_map_.find(tuple_desc);
DCHECK(tuple_it != dict_filter_tuple_map_.end());
dict_filter_tuple = tuple_it->second;
dict_filter_tuple->Init(tuple_desc->byte_size());
tuple_map[tuple_desc] = dict_filter_tuple;
} else {
dict_filter_tuple = dict_filter_tuple_it->second;
}
DCHECK(dict_filter_tuple != nullptr);
void* slot = dict_filter_tuple->GetSlot(slot_desc->tuple_offset());
bool column_has_match = false;
bool should_eval_runtime_filter = dictionary->num_entries() <=
state_->query_options().parquet_dictionary_runtime_filter_entry_limit;
int runtime_filters_processed = 0;
for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) {
if (dict_idx % 1024 == 0) {
// Don't let expr result allocations accumulate too much for large dictionaries or
// many row groups.
context_->expr_results_pool()->Clear();
}
dictionary->GetValue(dict_idx, slot);
// If any dictionary value passes the conjuncts and runtime filters, then move on to
// the next column.
TupleRow row;
row.SetTuple(0, dict_filter_tuple);
if (dict_filter_conjunct_evals != nullptr
&& !ExecNode::EvalConjuncts(dict_filter_conjunct_evals->data(),
dict_filter_conjunct_evals->size(), &row)) {
continue; // Failed the conjunct check, move on to the next entry.
}
column_has_match = true; // match caused by conjunct evaluation
if (runtime_filters != nullptr && should_eval_runtime_filter) {
for (int rf_idx = 0; rf_idx < runtime_filters->size(); rf_idx++) {
if (runtime_filters_processed <= rf_idx) runtime_filters_processed++;
if (!runtime_filters->at(rf_idx)->Eval(&row)) {
column_has_match = false;
break;
}
}
if (column_has_match) {
break; // Passed the conjunct and there were no runtime filter miss.
}
} else {
break; // Passed the conjunct and runtime filter does not exist.
}
}
// NULL values are not included in the parquet dictionary. If the column contains
// NULL values, add evaluating for NULL values.
bool has_set_null_count =
col_metadata.__isset.statistics && col_metadata.statistics.__isset.null_count;
bool should_eval_null_value = !has_set_null_count
|| (has_set_null_count && col_metadata.statistics.null_count > 0);
if (!column_has_match && should_eval_null_value) {
dict_filter_tuple->SetNull(slot_desc->null_indicator_offset());
TupleRow row;
row.SetTuple(0, dict_filter_tuple);
// Although the FE guarantees that dict_filter_conjunct evaluates
// to false on NULL, this condition is added for safety.
if (dict_filter_conjunct_evals == nullptr
|| (dict_filter_conjunct_evals != nullptr
&& ExecNode::EvalConjuncts(dict_filter_conjunct_evals->data(),
dict_filter_conjunct_evals->size(), &row))) {
column_has_match = true;
if (runtime_filters != nullptr && should_eval_runtime_filter) {
for (int rf_idx = 0; rf_idx < runtime_filters->size(); rf_idx++) {
if (!runtime_filters->at(rf_idx)->Eval(&row)) {
column_has_match = false;
break;
}
}
}
}
}
// Free all expr result allocations now that we're done with the filter.
context_->expr_results_pool()->Clear();
// Although, the accepted and rejected runtime filter stats can not be updated
// meaningfully, it is possible to update the processed stat.
if (runtime_filters != nullptr) {
for (int rf_idx = 0; rf_idx < runtime_filters_processed; rf_idx++) {
runtime_filters->at(rf_idx)->stats->IncrCounters(
FilterStats::ROW_GROUPS_KEY, 1, 1, 0);
}
}
// This column contains no value that matches the conjunct or runtime filter,
// therefore this row group can be eliminated.
if (!column_has_match) {
*row_group_eliminated = true;
COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1);
return Status::OK();
}
}
// Any columns that were not 100% dictionary encoded need to initialize
// their dictionaries here.
RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list));
return Status::OK();
}