be/src/pipeline/exec/dict_sink_operator.cpp (136 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "dict_sink_operator.h"
#include "common/status.h"
#include "vec/core/block.h"
#include "vec/functions/complex_hash_map_dictionary.h"
#include "vec/functions/dictionary_factory.h"
#include "vec/functions/dictionary_util.h"
#include "vec/functions/ip_address_dictionary.h"
namespace doris::pipeline {
#include "common/compile_check_begin.h"
Status DictSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
auto& p = _parent->cast<DictSinkOperatorX>();
_output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i]));
}
return Status::OK();
}
Status DictSinkLocalState::load_dict(RuntimeState* state) {
const auto& p = _parent->cast<DictSinkOperatorX>();
// now key_output_expr_slots size only 1
auto input_block = _dict_input_block.to_block();
for (auto& data : input_block) {
data.column = std::move(*data.column).mutate()->convert_column_if_overflow();
}
vectorized::ColumnsWithTypeAndName key_data;
vectorized::ColumnsWithTypeAndName value_data;
for (long key_expr_id : p._key_output_expr_slots) {
auto key_expr_ctx = _output_vexpr_ctxs[key_expr_id];
int key_column_id = -1;
RETURN_IF_ERROR(key_expr_ctx->execute(&input_block, &key_column_id));
key_data.push_back(input_block.get_by_position(key_column_id));
}
for (size_t i = 0; i < p._value_output_expr_slots.size(); i++) {
auto value_expr_id = p._value_output_expr_slots[i];
auto value_name = p._value_names[i];
auto value_expr_ctx = _output_vexpr_ctxs[value_expr_id];
int value_column_id = -1;
RETURN_IF_ERROR(value_expr_ctx->execute(&input_block, &value_column_id));
auto att_data = input_block.get_by_position(value_column_id);
att_data.name = value_name;
value_data.push_back(att_data);
}
RETURN_IF_ERROR(check_dict_input_data(key_data, value_data, p._skip_null_key));
const auto& dict_name = p._dictionary_name;
vectorized::DictionaryPtr dict = nullptr;
switch (p._layout_type) {
case TDictLayoutType::type::IP_TRIE: {
if (key_data.size() != 1) {
return Status::InvalidArgument("IP_TRIE dict key size must be 1");
}
dict = create_ip_trie_dict_from_column(dict_name, key_data[0], value_data);
break;
}
case TDictLayoutType::type::HASH_MAP: {
dict = create_complex_hash_map_dict_from_column(dict_name, key_data, value_data);
break;
}
default:
return Status::InvalidArgument("Unknown layout type");
}
if (dict == nullptr) {
return Status::InternalError("Failed to create dictionary");
}
if (dict->allocated_bytes() > p._memory_limit) {
return Status::InvalidArgument(
"load dict memory limit exceeded , current memory usage: {} , memory limit: {}",
dict->allocated_bytes(), p._memory_limit);
}
LOG(INFO) << fmt::format("Refresh dictionary {}, version: {}", p._dictionary_id, p._version_id);
RETURN_IF_ERROR(ExecEnv::GetInstance()->dict_factory()->refresh_dict(p._dictionary_id,
p._version_id, dict));
return Status::OK();
}
DictSinkOperatorX::DictSinkOperatorX(int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& dict_input_expr,
const TDictionarySink& dict_sink)
: Base(operator_id, 0, 0),
_dictionary_id(dict_sink.dictionary_id),
_version_id(dict_sink.version_id),
_dictionary_name(dict_sink.dictionary_name),
_layout_type(dict_sink.layout_type),
_key_output_expr_slots(dict_sink.key_output_expr_slots),
_value_output_expr_slots(dict_sink.value_output_expr_slots),
_value_names(dict_sink.value_names),
_row_desc(row_desc),
_t_output_expr(dict_input_expr),
_skip_null_key(dict_sink.skip_null_key),
_memory_limit(dict_sink.memory_limit) {}
Status DictSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Base::prepare(state));
if (_value_output_expr_slots.size() != _value_names.size()) {
return Status::InternalError("value_output_expr_slots.size() != value_names.size()");
}
if (_child->parallel_tasks() != 1) {
return Status::InternalError("DictSinkOperatorX parallel must be 1");
}
// prepare output_expr
// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
for (auto key_expr_id : _key_output_expr_slots) {
auto key_expr = _output_vexpr_ctxs[key_expr_id]->root();
if (!key_expr->is_slot_ref()) {
return Status::InvalidArgument(
"DictSinkOperatorX expr must be slot ref , but now is {}",
key_expr->expr_name());
}
}
for (auto value_expr_id : _value_output_expr_slots) {
auto value_expr = _output_vexpr_ctxs[value_expr_id]->root();
if (!value_expr->is_slot_ref()) {
return Status::InvalidArgument(
"DictSinkOperatorX expr must be slot ref , but now is {}",
value_expr->expr_name());
}
}
return Status::OK();
}
Status DictSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->dict_factory()->mem_tracker());
if (local_state._dict_input_block.columns() == 0) {
local_state._dict_input_block =
vectorized::Block(vectorized::VectorizedUtils::create_empty_block(_row_desc));
}
if (in_block->rows() != 0) {
RETURN_IF_ERROR(local_state._dict_input_block.merge_ignore_overflow(std::move(*in_block)));
}
if (eos) {
RETURN_IF_ERROR(local_state.load_dict(state));
}
return Status::OK();
}
} // namespace doris::pipeline
#include "common/compile_check_end.h"