Status HdfsParquetScanner::EvalDictionaryFilters()

in be/src/exec/parquet/hdfs-parquet-scanner.cc [1839:2003]


Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_group,
    bool* row_group_eliminated) {
  *row_group_eliminated = false;
  // Check if there's anything to do here.
  if (dict_filterable_readers_.empty()) return Status::OK();

  // Legacy impala files (< 2.9) require special handling, because they do not encode
  // information about whether the column is 100% dictionary encoded.
  bool is_legacy_impala = false;
  if (file_version_.application == "impala" && file_version_.VersionLt(2,9,0)) {
    is_legacy_impala = true;
  }

  // Keeps track of column readers that need to be initialized. For example, if a
  // column cannot be filtered, then defer its dictionary initialization once we know
  // the row group cannot be filtered.
  vector<BaseScalarColumnReader*> deferred_dict_init_list;
  // Keeps track of the initialized tuple associated with a TupleDescriptor.
  unordered_map<const TupleDescriptor*, Tuple*> tuple_map;
  for (BaseScalarColumnReader* scalar_reader : dict_filterable_readers_) {
    const parquet::ColumnMetaData& col_metadata =
        row_group.columns[scalar_reader->col_idx()].meta_data;

    // Legacy impala files cannot be eliminated here, because the only way to
    // determine whether the column is 100% dictionary encoded requires reading
    // the dictionary.
    if (!is_legacy_impala && !IsDictionaryEncoded(col_metadata)) {
      // We cannot guarantee that this reader is 100% dictionary encoded,
      // so dictionary filters cannot be used. Defer initializing its dictionary
      // until after the other filters have been evaluated.
      deferred_dict_init_list.push_back(scalar_reader);
      continue;
    }

    RETURN_IF_ERROR(scalar_reader->InitDictionary());
    DictDecoderBase* dictionary = scalar_reader->GetDictionaryDecoder();
    if (!dictionary) continue;

    // Legacy (version < 2.9) Impala files do not spill to PLAIN encoding until
    // it reaches the maximum number of dictionary entries. If the dictionary
    // has fewer entries, then it is 100% dictionary encoded.
    if (is_legacy_impala &&
        dictionary->num_entries() >= LEGACY_IMPALA_MAX_DICT_ENTRIES) continue;

    const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
    DCHECK(slot_desc != nullptr);
    const TupleDescriptor* tuple_desc = slot_desc->parent();
    auto dict_filter_it = dict_filter_map_.find(slot_desc->id());
    const vector<ScalarExprEvaluator*>* dict_filter_conjunct_evals =
        dict_filter_it != dict_filter_map_.end()
        ? &(dict_filter_it->second) : nullptr;

    auto runtime_filter_it = single_col_filter_ctxs_.find(slot_desc->id());
    const vector<const FilterContext*>* runtime_filters =
        runtime_filter_it != single_col_filter_ctxs_.end()
        ? &runtime_filter_it->second : nullptr;
    DCHECK(runtime_filters != nullptr || dict_filter_conjunct_evals != nullptr);

    Tuple* dict_filter_tuple = nullptr;
    auto dict_filter_tuple_it = tuple_map.find(tuple_desc);
    if (dict_filter_tuple_it == tuple_map.end()) {
      auto tuple_it = dict_filter_tuple_map_.find(tuple_desc);
      DCHECK(tuple_it != dict_filter_tuple_map_.end());
      dict_filter_tuple = tuple_it->second;
      dict_filter_tuple->Init(tuple_desc->byte_size());
      tuple_map[tuple_desc] = dict_filter_tuple;
    } else {
      dict_filter_tuple = dict_filter_tuple_it->second;
    }

    DCHECK(dict_filter_tuple != nullptr);
    void* slot = dict_filter_tuple->GetSlot(slot_desc->tuple_offset());
    bool column_has_match = false;
    bool should_eval_runtime_filter = dictionary->num_entries() <=
        state_->query_options().parquet_dictionary_runtime_filter_entry_limit;
    int runtime_filters_processed = 0;
    for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) {
      if (dict_idx % 1024 == 0) {
        // Don't let expr result allocations accumulate too much for large dictionaries or
        // many row groups.
        context_->expr_results_pool()->Clear();
      }
      dictionary->GetValue(dict_idx, slot);

      // If any dictionary value passes the conjuncts and runtime filters, then move on to
      // the next column.
      TupleRow row;
      row.SetTuple(0, dict_filter_tuple);
      if (dict_filter_conjunct_evals != nullptr
          && !ExecNode::EvalConjuncts(dict_filter_conjunct_evals->data(),
              dict_filter_conjunct_evals->size(), &row)) {
        continue; // Failed the conjunct check, move on to the next entry.
      }
      column_has_match = true; // match caused by conjunct evaluation
      if (runtime_filters != nullptr && should_eval_runtime_filter) {
        for (int rf_idx = 0; rf_idx < runtime_filters->size(); rf_idx++) {
          if (runtime_filters_processed <= rf_idx) runtime_filters_processed++;
          if (!runtime_filters->at(rf_idx)->Eval(&row)) {
            column_has_match = false;
            break;
          }
        }
        if (column_has_match) {
          break; // Passed the conjunct and there were no runtime filter miss.
        }
      } else {
        break; // Passed the conjunct and runtime filter does not exist.
      }
    }

    // NULL values are not included in the parquet dictionary. If the column contains
    // NULL values, add evaluating for NULL values.
    bool has_set_null_count =
        col_metadata.__isset.statistics && col_metadata.statistics.__isset.null_count;
    bool should_eval_null_value = !has_set_null_count
        || (has_set_null_count && col_metadata.statistics.null_count > 0);
    if (!column_has_match && should_eval_null_value) {
      dict_filter_tuple->SetNull(slot_desc->null_indicator_offset());
      TupleRow row;
      row.SetTuple(0, dict_filter_tuple);
      // Although the FE guarantees that dict_filter_conjunct evaluates
      // to false on NULL, this condition is added for safety.
      if (dict_filter_conjunct_evals == nullptr
          || (dict_filter_conjunct_evals != nullptr
          && ExecNode::EvalConjuncts(dict_filter_conjunct_evals->data(),
              dict_filter_conjunct_evals->size(), &row))) {
        column_has_match = true;
        if (runtime_filters != nullptr && should_eval_runtime_filter) {
          for (int rf_idx = 0; rf_idx < runtime_filters->size(); rf_idx++) {
            if (!runtime_filters->at(rf_idx)->Eval(&row)) {
              column_has_match = false;
              break;
            }
          }
        }
      }
    }

    // Free all expr result allocations now that we're done with the filter.
    context_->expr_results_pool()->Clear();

    // Although, the accepted and rejected runtime filter stats can not be updated
    // meaningfully, it is possible to update the processed stat.
    if (runtime_filters != nullptr) {
      for (int rf_idx = 0; rf_idx < runtime_filters_processed; rf_idx++) {
          runtime_filters->at(rf_idx)->stats->IncrCounters(
              FilterStats::ROW_GROUPS_KEY, 1, 1, 0);
      }
    }

    // This column contains no value that matches the conjunct or runtime filter,
    // therefore this row group can be eliminated.
    if (!column_has_match) {
      *row_group_eliminated = true;
      COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1);
      return Status::OK();
    }
  }

  // Any columns that were not 100% dictionary encoded need to initialize
  // their dictionaries here.
  RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list));

  return Status::OK();
}