be/src/vec/columns/column_object.cpp (1,791 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // This file is copied from // https://github.com/ClickHouse/ClickHouse/blob/master/src/Columns/ColumnObject.cpp // and modified by Doris #include "vec/columns/column_object.h" #include <assert.h> #include <fmt/core.h> #include <fmt/format.h> #include <glog/logging.h> #include <parallel_hashmap/phmap.h> #include <rapidjson/stringbuffer.h> #include <rapidjson/writer.h> #include <algorithm> #include <cstdlib> #include <functional> #include <limits> #include <map> #include <memory> #include <optional> #include <sstream> #include <vector> #include "common/compiler_util.h" // IWYU pragma: keep #include "common/exception.h" #include "common/logging.h" #include "common/status.h" #include "exprs/json_functions.h" #include "olap/olap_common.h" #include "util/defer_op.h" #include "util/simd/bits.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/aggregate_functions/helpers.h" #include "vec/columns/column.h" #include "vec/columns/column_array.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_string.h" #include "vec/columns/column_vector.h" #include "vec/columns/columns_number.h" #include "vec/common/arena.h" #include "vec/common/assert_cast.h" #include "vec/common/field_visitors.h" #include "vec/common/schema_util.h" #include "vec/common/string_buffer.hpp" #include "vec/common/string_ref.h" #include "vec/core/column_with_type_and_name.h" #include "vec/core/field.h" #include "vec/core/types.h" #include "vec/data_types/convert_field_to_type.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_array.h" #include "vec/data_types/data_type_decimal.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/data_types/data_type_jsonb.h" #include "vec/data_types/data_type_nothing.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_object.h" #include "vec/data_types/get_least_supertype.h" #include "vec/json/path_in_data.h" #ifdef __AVX2__ #include "util/jsonb_parser_simd.h" #else #include "util/jsonb_parser.h" #endif namespace doris::vectorized { #include "common/compile_check_begin.h" namespace { DataTypePtr create_array_of_type(TypeIndex type, size_t num_dimensions, bool is_nullable) { if (type == ColumnObject::MOST_COMMON_TYPE_ID) { // JSONB type MUST NOT wrapped in ARRAY column, it should be top level. // So we ignored num_dimensions. return is_nullable ? make_nullable(std::make_shared<ColumnObject::MostCommonType>()) : std::make_shared<ColumnObject::MostCommonType>(); } DataTypePtr result = DataTypeFactory::instance().create_data_type(type, is_nullable); for (size_t i = 0; i < num_dimensions; ++i) { result = std::make_shared<DataTypeArray>(result); if (is_nullable) { // wrap array with nullable result = make_nullable(result); } } return result; } DataTypePtr get_base_type_of_array(const DataTypePtr& type) { /// Get raw pointers to avoid extra copying of type pointers. const DataTypeArray* last_array = nullptr; const auto* current_type = type.get(); if (const auto* nullable = typeid_cast<const DataTypeNullable*>(current_type)) { current_type = nullable->get_nested_type().get(); } while (const auto* type_array = typeid_cast<const DataTypeArray*>(current_type)) { current_type = type_array->get_nested_type().get(); last_array = type_array; if (const auto* nullable = typeid_cast<const DataTypeNullable*>(current_type)) { current_type = nullable->get_nested_type().get(); } } return last_array ? last_array->get_nested_type() : type; } size_t get_number_of_dimensions(const IDataType& type) { int num_dimensions = 0; const auto* current_type = &type; if (const auto* nullable = typeid_cast<const DataTypeNullable*>(current_type)) { current_type = nullable->get_nested_type().get(); } while (const auto* type_array = typeid_cast<const DataTypeArray*>(current_type)) { current_type = type_array->get_nested_type().get(); num_dimensions += 1; if (const auto* nullable = typeid_cast<const DataTypeNullable*>(current_type)) { current_type = nullable->get_nested_type().get(); } } return num_dimensions; } /// Calculates number of dimensions in array field. /// Returns 0 for scalar fields. class FieldVisitorToNumberOfDimensions : public StaticVisitor<size_t> { public: size_t operator()(const Array& x) const { const size_t size = x.size(); size_t dimensions = 0; for (size_t i = 0; i < size; ++i) { size_t element_dimensions = apply_visitor(*this, x[i]); dimensions = std::max(dimensions, element_dimensions); } return 1 + dimensions; } template <typename T> size_t operator()(const T&) const { return 0; } }; // Visitor that allows to get type of scalar field // but exclude fields contain complex field.This is a faster version // for FieldVisitorToScalarType which does not support complex field. class SimpleFieldVisitorToScalarType : public StaticVisitor<size_t> { public: size_t operator()(const Array& x) { throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Array type is not supported"); } size_t operator()(const UInt64& x) { if (x <= std::numeric_limits<Int8>::max()) { type = TypeIndex::Int8; } else if (x <= std::numeric_limits<Int16>::max()) { type = TypeIndex::Int16; } else if (x <= std::numeric_limits<Int32>::max()) { type = TypeIndex::Int32; } else { type = TypeIndex::Int64; } return 1; } size_t operator()(const Int64& x) { if (x <= std::numeric_limits<Int8>::max() && x >= std::numeric_limits<Int8>::min()) { type = TypeIndex::Int8; } else if (x <= std::numeric_limits<Int16>::max() && x >= std::numeric_limits<Int16>::min()) { type = TypeIndex::Int16; } else if (x <= std::numeric_limits<Int32>::max() && x >= std::numeric_limits<Int32>::min()) { type = TypeIndex::Int32; } else { type = TypeIndex::Int64; } return 1; } size_t operator()(const JsonbField& x) { type = TypeIndex::JSONB; return 1; } size_t operator()(const Null&) { have_nulls = true; return 1; } size_t operator()(const VariantMap&) { type = TypeIndex::VARIANT; return 1; } template <typename T> size_t operator()(const T&) { type = TypeId<NearestFieldType<T>>::value; return 1; } void get_scalar_type(TypeIndex* data_type) const { *data_type = type; } bool contain_nulls() const { return have_nulls; } bool need_convert_field() const { return false; } private: TypeIndex type = TypeIndex::Nothing; bool have_nulls = false; }; /// Visitor that allows to get type of scalar field /// or least common type of scalars in array. /// More optimized version of FieldToDataType. class FieldVisitorToScalarType : public StaticVisitor<size_t> { public: using FieldType = Field::Types::Which; size_t operator()(const Array& x) { size_t size = x.size(); for (size_t i = 0; i < size; ++i) { apply_visitor(*this, x[i]); } return 0; } // TODO doris not support unsigned integers for now // treat as signed integers size_t operator()(const UInt64& x) { field_types.insert(FieldType::UInt64); if (x <= std::numeric_limits<Int8>::max()) { type_indexes.insert(TypeIndex::Int8); } else if (x <= std::numeric_limits<Int16>::max()) { type_indexes.insert(TypeIndex::Int16); } else if (x <= std::numeric_limits<Int32>::max()) { type_indexes.insert(TypeIndex::Int32); } else { type_indexes.insert(TypeIndex::Int64); } return 0; } size_t operator()(const Int64& x) { field_types.insert(FieldType::Int64); if (x <= std::numeric_limits<Int8>::max() && x >= std::numeric_limits<Int8>::min()) { type_indexes.insert(TypeIndex::Int8); } else if (x <= std::numeric_limits<Int16>::max() && x >= std::numeric_limits<Int16>::min()) { type_indexes.insert(TypeIndex::Int16); } else if (x <= std::numeric_limits<Int32>::max() && x >= std::numeric_limits<Int32>::min()) { type_indexes.insert(TypeIndex::Int32); } else { type_indexes.insert(TypeIndex::Int64); } return 0; } size_t operator()(const JsonbField& x) { field_types.insert(FieldType::JSONB); type_indexes.insert(TypeIndex::JSONB); return 0; } size_t operator()(const VariantMap&) { field_types.insert(FieldType::VariantMap); type_indexes.insert(TypeIndex::VARIANT); return 0; } size_t operator()(const Null&) { have_nulls = true; return 0; } template <typename T> size_t operator()(const T&) { Field::EnumToType<Field::Types::Array>::Type a; field_types.insert(Field::TypeToEnum<NearestFieldType<T>>::value); type_indexes.insert(TypeId<NearestFieldType<T>>::value); return 0; } void get_scalar_type(TypeIndex* type) const { DataTypePtr data_type; get_least_supertype_jsonb(type_indexes, &data_type); *type = data_type->get_type_id(); } bool contain_nulls() const { return have_nulls; } bool need_convert_field() const { return field_types.size() > 1; } private: phmap::flat_hash_set<TypeIndex> type_indexes; phmap::flat_hash_set<FieldType> field_types; bool have_nulls = false; }; /// Visitor that keeps @num_dimensions_to_keep dimensions in arrays /// and replaces all scalars or nested arrays to @replacement at that level. class FieldVisitorReplaceScalars : public StaticVisitor<Field> { public: FieldVisitorReplaceScalars(const Field& replacement_, size_t num_dimensions_to_keep_) : replacement(replacement_), num_dimensions_to_keep(num_dimensions_to_keep_) {} Field operator()(const Array& x) const { if (num_dimensions_to_keep == 0) { return replacement; } const size_t size = x.size(); Array res(size); for (size_t i = 0; i < size; ++i) { res[i] = apply_visitor( FieldVisitorReplaceScalars(replacement, num_dimensions_to_keep - 1), x[i]); } return res; } template <typename T> Field operator()(const T&) const { return replacement; } private: const Field& replacement; size_t num_dimensions_to_keep; }; } // namespace template <typename Visitor> void get_field_info_impl(const Field& field, FieldInfo* info) { Visitor to_scalar_type_visitor; apply_visitor(to_scalar_type_visitor, field); TypeIndex type_id; to_scalar_type_visitor.get_scalar_type(&type_id); // array item's dimension may missmatch, eg. [1, 2, [1, 2, 3]] *info = { type_id, to_scalar_type_visitor.contain_nulls(), to_scalar_type_visitor.need_convert_field(), apply_visitor(FieldVisitorToNumberOfDimensions(), field), }; } void get_field_info(const Field& field, FieldInfo* info) { if (field.is_complex_field()) { get_field_info_impl<FieldVisitorToScalarType>(field, info); } else { get_field_info_impl<SimpleFieldVisitorToScalarType>(field, info); } } ColumnObject::Subcolumn::Subcolumn(MutableColumnPtr&& data_, DataTypePtr type, bool is_nullable_, bool is_root_) : least_common_type(type), is_nullable(is_nullable_), is_root(is_root_) { data.push_back(std::move(data_)); data_types.push_back(type); } ColumnObject::Subcolumn::Subcolumn(size_t size_, bool is_nullable_, bool is_root_) : least_common_type(std::make_shared<DataTypeNothing>()), is_nullable(is_nullable_), num_of_defaults_in_prefix(size_), is_root(is_root_) {} size_t ColumnObject::Subcolumn::Subcolumn::size() const { size_t res = num_of_defaults_in_prefix; for (const auto& part : data) { res += part->size(); } return res; } size_t ColumnObject::Subcolumn::Subcolumn::byteSize() const { size_t res = 0; for (const auto& part : data) { res += part->byte_size(); } return res; } size_t ColumnObject::Subcolumn::Subcolumn::allocatedBytes() const { size_t res = 0; for (const auto& part : data) { res += part->allocated_bytes(); } return res; } void ColumnObject::Subcolumn::insert(Field field) { FieldInfo info; get_field_info(field, &info); insert(std::move(field), std::move(info)); } void ColumnObject::Subcolumn::add_new_column_part(DataTypePtr type) { data.push_back(type->create_column()); least_common_type = LeastCommonType {type}; data_types.push_back(type); } void ColumnObject::Subcolumn::insert(Field field, FieldInfo info) { auto base_type = WhichDataType(info.scalar_type_id); if (base_type.is_nothing() && info.num_dimensions == 0) { insert_default(); return; } auto column_dim = least_common_type.get_dimensions(); auto value_dim = info.num_dimensions; if (is_nothing(least_common_type.get_base())) { column_dim = value_dim; } if (base_type.is_nothing()) { value_dim = column_dim; } bool type_changed = false; if (value_dim != column_dim || info.num_dimensions >= 2) { // Deduce to JSONB VLOG_DEBUG << fmt::format( "Dimension of types mismatched between inserted value and column, " "expected:{}, but meet:{} for type:{}", column_dim, value_dim, least_common_type.get()->get_name()); base_type = MOST_COMMON_TYPE_ID; value_dim = 0; type_changed = true; } if (data.empty()) { add_new_column_part(create_array_of_type(base_type.idx, value_dim, is_nullable)); } else if (least_common_type.get_base_type_id() != base_type.idx && !base_type.is_nothing()) { if (schema_util::is_conversion_required_between_integers( base_type.idx, least_common_type.get_base_type_id())) { VLOG_DEBUG << "Conversion between " << getTypeName(base_type.idx) << " and " << getTypeName(least_common_type.get_type_id()); DataTypePtr base_data_type; TypeIndex base_data_type_id; get_least_supertype_jsonb( TypeIndexSet {base_type.idx, least_common_type.get_base_type_id()}, &base_data_type); type_changed = true; base_data_type_id = base_data_type->get_type_id(); if (is_nullable) { base_data_type = make_nullable(base_data_type); } if (!least_common_type.get_base()->equals(*base_data_type)) { add_new_column_part( create_array_of_type(base_data_type_id, value_dim, is_nullable)); } } } if (type_changed || info.need_convert) { Field new_field; convert_field_to_type(field, *least_common_type.get(), &new_field); field = new_field; } data.back()->insert(field); } static DataTypePtr create_array(TypeIndex type, size_t num_dimensions) { DataTypePtr result_type = make_nullable(DataTypeFactory::instance().create_data_type(type)); for (size_t i = 0; i < num_dimensions; ++i) { result_type = make_nullable(std::make_shared<DataTypeArray>(result_type)); } return result_type; } Array create_empty_array_field(size_t num_dimensions) { if (num_dimensions == 0) { throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Cannot create array field with 0 dimensions"); } Array array; Array* current_array = &array; for (size_t i = 1; i < num_dimensions; ++i) { current_array->push_back(Array()); current_array = &current_array->back().get<Array&>(); } return array; } // Recreates column with default scalar values and keeps sizes of arrays. static ColumnPtr recreate_column_with_default_values(const ColumnPtr& column, TypeIndex scalar_type, size_t num_dimensions) { const auto* column_array = check_and_get_column<ColumnArray>(remove_nullable(column).get()); if (column_array != nullptr && num_dimensions != 0) { return make_nullable(ColumnArray::create( recreate_column_with_default_values(column_array->get_data_ptr(), scalar_type, num_dimensions - 1), IColumn::mutate(column_array->get_offsets_ptr()))); } return create_array(scalar_type, num_dimensions) ->create_column() ->clone_resized(column->size()); } ColumnObject::Subcolumn ColumnObject::Subcolumn::clone_with_default_values( const FieldInfo& field_info) const { Subcolumn new_subcolumn(*this); new_subcolumn.least_common_type = LeastCommonType {create_array(field_info.scalar_type_id, field_info.num_dimensions)}; for (int i = 0; i < new_subcolumn.data.size(); ++i) { new_subcolumn.data[i] = recreate_column_with_default_values( new_subcolumn.data[i], field_info.scalar_type_id, field_info.num_dimensions); new_subcolumn.data_types[i] = create_array_of_type(field_info.scalar_type_id, field_info.num_dimensions, is_nullable); } return new_subcolumn; } Field ColumnObject::Subcolumn::get_last_field() const { if (data.empty()) { return Field(); } const auto& last_part = data.back(); assert(!last_part->empty()); return (*last_part)[last_part->size() - 1]; } void ColumnObject::Subcolumn::insert_range_from(const Subcolumn& src, size_t start, size_t length) { if (start + length > src.size()) { throw doris::Exception( ErrorCode::OUT_OF_BOUND, "Invalid range for insert_range_from: start={}, length={}, src.size={}", start, length, src.size()); } size_t end = start + length; // num_rows += length; if (data.empty()) { add_new_column_part(src.get_least_common_type()); } else if (!least_common_type.get()->equals(*src.get_least_common_type())) { DataTypePtr new_least_common_type; get_least_supertype_jsonb(DataTypes {least_common_type.get(), src.get_least_common_type()}, &new_least_common_type); if (!new_least_common_type->equals(*least_common_type.get())) { add_new_column_part(std::move(new_least_common_type)); } } if (end <= src.num_of_defaults_in_prefix) { data.back()->insert_many_defaults(length); return; } if (start < src.num_of_defaults_in_prefix) { data.back()->insert_many_defaults(src.num_of_defaults_in_prefix - start); } auto insert_from_part = [&](const auto& column, const auto& column_type, size_t from, size_t n) { if (from + n > column->size()) { throw doris::Exception( ErrorCode::OUT_OF_BOUND, "Invalid range for insert_range_from: from={}, n={}, column.size={}", from, n, column->size()); } if (column_type->equals(*least_common_type.get())) { data.back()->insert_range_from(*column, from, n); return; } /// If we need to insert large range, there is no sense to cut part of column and cast it. /// Casting of all column and inserting from it can be faster. /// Threshold is just a guess. if (n * 3 >= column->size()) { ColumnPtr casted_column; Status st = schema_util::cast_column({column, column_type, ""}, least_common_type.get(), &casted_column); if (!st.ok()) { throw doris::Exception(ErrorCode::INVALID_ARGUMENT, st.to_string()); } data.back()->insert_range_from(*casted_column, from, n); return; } auto casted_column = column->cut(from, n); Status st = schema_util::cast_column({casted_column, column_type, ""}, least_common_type.get(), &casted_column); if (!st.ok()) { throw doris::Exception(ErrorCode::INVALID_ARGUMENT, st.to_string()); } data.back()->insert_range_from(*casted_column, 0, n); }; size_t pos = 0; size_t processed_rows = src.num_of_defaults_in_prefix; /// Find the first part of the column that intersects the range. while (pos < src.data.size() && processed_rows + src.data[pos]->size() < start) { processed_rows += src.data[pos]->size(); ++pos; } /// Insert from the first part of column. if (pos < src.data.size() && processed_rows < start) { size_t part_start = start - processed_rows; size_t part_length = std::min(src.data[pos]->size() - part_start, end - start); insert_from_part(src.data[pos], src.data_types[pos], part_start, part_length); processed_rows += src.data[pos]->size(); ++pos; } /// Insert from the parts of column in the middle of range. while (pos < src.data.size() && processed_rows + src.data[pos]->size() < end) { insert_from_part(src.data[pos], src.data_types[pos], 0, src.data[pos]->size()); processed_rows += src.data[pos]->size(); ++pos; } /// Insert from the last part of column if needed. if (pos < src.data.size() && processed_rows < end) { size_t part_end = end - processed_rows; insert_from_part(src.data[pos], src.data_types[pos], 0, part_end); } } bool ColumnObject::Subcolumn::is_finalized() const { return num_of_defaults_in_prefix == 0 && (data.empty() || (data.size() == 1)); } template <typename Func> MutableColumnPtr ColumnObject::apply_for_subcolumns(Func&& func) const { if (!is_finalized()) { auto finalized = clone_finalized(); auto& finalized_object = assert_cast<ColumnObject&>(*finalized); return finalized_object.apply_for_subcolumns(std::forward<Func>(func)); } auto res = ColumnObject::create(is_nullable, false); for (const auto& subcolumn : subcolumns) { auto new_subcolumn = func(subcolumn->data.get_finalized_column()); res->add_sub_column(subcolumn->path, new_subcolumn->assume_mutable(), subcolumn->data.get_least_common_type()); } check_consistency(); return res; } void ColumnObject::resize(size_t n) { if (n == num_rows) { return; } if (n > num_rows) { insert_many_defaults(n - num_rows); } else { for (auto& subcolumn : subcolumns) { subcolumn->data.pop_back(num_rows - n); } } num_rows = n; } bool ColumnObject::Subcolumn::check_if_sparse_column(size_t arg_num_rows) { if (arg_num_rows < config::variant_threshold_rows_to_estimate_sparse_column) { return false; } std::vector<double> defaults_ratio; for (size_t i = 0; i < data.size(); ++i) { defaults_ratio.push_back(data[i]->get_ratio_of_default_rows()); } double default_ratio = std::accumulate(defaults_ratio.begin(), defaults_ratio.end(), 0.0) / static_cast<double>(defaults_ratio.size()); return default_ratio >= config::variant_ratio_of_defaults_as_sparse_column; } void ColumnObject::Subcolumn::finalize(FinalizeMode mode) { if (is_finalized()) { return; } if (data.size() == 1 && num_of_defaults_in_prefix == 0) { data[0] = data[0]->convert_to_full_column_if_const(); return; } DataTypePtr to_type = least_common_type.get(); if (mode == FinalizeMode::WRITE_MODE && is_root) { // Root always JSONB type in write mode to_type = is_nullable ? make_nullable(std::make_shared<MostCommonType>()) : std::make_shared<MostCommonType>(); least_common_type = LeastCommonType {to_type}; } auto result_column = to_type->create_column(); if (num_of_defaults_in_prefix) { result_column->insert_many_defaults(num_of_defaults_in_prefix); } for (size_t i = 0; i < data.size(); ++i) { auto& part = data[i]; auto from_type = data_types[i]; part = part->convert_to_full_column_if_const(); size_t part_size = part->size(); if (!from_type->equals(*to_type)) { ColumnPtr ptr; Status st = schema_util::cast_column({part, from_type, ""}, to_type, &ptr); if (!st.ok()) { throw doris::Exception(ErrorCode::INVALID_ARGUMENT, st.to_string()); } part = ptr->convert_to_full_column_if_const(); } result_column->insert_range_from(*part, 0, part_size); } data = {std::move(result_column)}; data_types = {std::move(to_type)}; num_of_defaults_in_prefix = 0; } void ColumnObject::Subcolumn::insert_default() { if (data.empty()) { ++num_of_defaults_in_prefix; } else { data.back()->insert_default(); } } void ColumnObject::Subcolumn::insert_many_defaults(size_t length) { if (data.empty()) { num_of_defaults_in_prefix += length; } else { data.back()->insert_many_defaults(length); } } void ColumnObject::Subcolumn::pop_back(size_t n) { if (n > size()) { throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Invalid number of elements to pop: {}, size: {}", n, size()); } size_t num_removed = 0; for (auto it = data.rbegin(); it != data.rend(); ++it) { if (n == 0) { break; } auto& column = *it; if (n < column->size()) { column->pop_back(n); n = 0; } else { ++num_removed; n -= column->size(); } } size_t sz = data.size() - num_removed; data.resize(sz); data_types.resize(sz); // need to update least_common_type when pop_back a column from the last least_common_type = sz > 0 ? LeastCommonType {data_types[sz - 1]} : LeastCommonType {std::make_shared<DataTypeNothing>()}; num_of_defaults_in_prefix -= n; } IColumn& ColumnObject::Subcolumn::get_finalized_column() { if (!is_finalized()) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Subcolumn is not finalized"); } return *data[0]; } const IColumn& ColumnObject::Subcolumn::get_finalized_column() const { if (!is_finalized()) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Subcolumn is not finalized"); } return *data[0]; } const ColumnPtr& ColumnObject::Subcolumn::get_finalized_column_ptr() const { if (!is_finalized()) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Subcolumn is not finalized"); } return data[0]; } ColumnPtr& ColumnObject::Subcolumn::get_finalized_column_ptr() { if (!is_finalized()) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Subcolumn is not finalized"); } return data[0]; } void ColumnObject::Subcolumn::remove_nullable() { if (!is_finalized()) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Subcolumn is not finalized"); } data[0] = doris::vectorized::remove_nullable(data[0]); least_common_type.remove_nullable(); } ColumnObject::Subcolumn::LeastCommonType::LeastCommonType(DataTypePtr type_) : type(std::move(type_)), base_type(get_base_type_of_array(type)), num_dimensions(get_number_of_dimensions(*type)) { least_common_type_serder = type->get_serde(); type_id = type->is_nullable() ? assert_cast<const DataTypeNullable*>(type.get()) ->get_nested_type() ->get_type_id() : type->get_type_id(); base_type_id = base_type->is_nullable() ? assert_cast<const DataTypeNullable*>(base_type.get()) ->get_nested_type() ->get_type_id() : base_type->get_type_id(); } ColumnObject::ColumnObject(bool is_nullable_, bool create_root_) : is_nullable(is_nullable_), num_rows(0) { if (create_root_) { subcolumns.create_root(Subcolumn(0, is_nullable, true /*root*/)); } } ColumnObject::ColumnObject(bool is_nullable_, DataTypePtr type, MutableColumnPtr&& column) : is_nullable(is_nullable_) { add_sub_column({}, std::move(column), type); } ColumnObject::ColumnObject(Subcolumns&& subcolumns_, bool is_nullable_) : is_nullable(is_nullable_), subcolumns(std::move(subcolumns_)), num_rows(subcolumns.empty() ? 0 : (*subcolumns.begin())->data.size()) { check_consistency(); } void ColumnObject::check_consistency() const { if (subcolumns.empty()) { return; } for (const auto& leaf : subcolumns) { if (num_rows != leaf->data.size()) { throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, "unmatched column: {}, expeted rows: {}, but meet: {}", leaf->path.get_path(), num_rows, leaf->data.size()); } } } size_t ColumnObject::size() const { #ifndef NDEBUG check_consistency(); #endif return num_rows; } MutableColumnPtr ColumnObject::clone_resized(size_t new_size) const { if (new_size == 0) { return ColumnObject::create(is_nullable); } // If subcolumns are empty, then res will be empty but new_size > 0 if (subcolumns.empty()) { // Add an emtpy column with new_size rows auto res = ColumnObject::create(true, false); res->set_num_rows(new_size); return res; } auto res = apply_for_subcolumns( [&](const auto& subcolumn) { return subcolumn.clone_resized(new_size); }); return res; } size_t ColumnObject::byte_size() const { size_t res = 0; for (const auto& entry : subcolumns) { res += entry->data.byteSize(); } return res; } size_t ColumnObject::allocated_bytes() const { size_t res = 0; for (const auto& entry : subcolumns) { res += entry->data.allocatedBytes(); } return res; } void ColumnObject::for_each_subcolumn(ColumnCallback callback) { for (auto& entry : subcolumns) { for (auto& part : entry->data.data) { callback(part); } } } void ColumnObject::insert_from(const IColumn& src, size_t n) { const auto* src_v = check_and_get_column<ColumnObject>(src); // optimize when src and this column are scalar variant, since try_insert is inefficiency if (src_v != nullptr && src_v->is_scalar_variant() && is_scalar_variant() && src_v->get_root_type()->equals(*get_root_type()) && src_v->is_finalized() && is_finalized()) { assert_cast<ColumnNullable&, TypeCheckOnRelease::DISABLE>(*get_root()) .insert_from(*src_v->get_root(), n); ++num_rows; return; } return try_insert(src[n]); } void ColumnObject::try_insert(const Field& field) { if (field.get_type() != Field::Types::VariantMap) { if (field.is_null()) { insert_default(); return; } auto* root = get_subcolumn({}); // Insert to an emtpy ColumnObject may result root null, // so create a root column of Variant is expected. if (root == nullptr) { bool succ = add_sub_column({}, num_rows); if (!succ) { throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT, "Failed to add root sub column {}"); } root = get_subcolumn({}); } root->insert(field); ++num_rows; return; } const auto& object = field.get<const VariantMap&>(); size_t old_size = size(); for (const auto& [key_str, value] : object) { PathInData key; if (!key_str.empty()) { key = PathInData(key_str); } if (!has_subcolumn(key)) { bool succ = add_sub_column(key, old_size); if (!succ) { throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT, "Failed to add sub column {}", key.get_path()); } } auto* subcolumn = get_subcolumn(key); if (!subcolumn) { doris::Exception(doris::ErrorCode::INVALID_ARGUMENT, fmt::format("Failed to find sub column {}", key.get_path())); } subcolumn->insert(value); } for (auto& entry : subcolumns) { if (old_size == entry->data.size()) { bool inserted = try_insert_default_from_nested(entry); if (!inserted) { entry->data.insert_default(); } } } ++num_rows; } void ColumnObject::insert_default() { for (auto& entry : subcolumns) { entry->data.insert_default(); } ++num_rows; } void ColumnObject::Subcolumn::get(size_t n, Field& res) const { if (least_common_type.get_base_type_id() == TypeIndex::Nothing) { res = Null(); return; } if (is_finalized()) { if (least_common_type.get_base_type_id() == TypeIndex::JSONB) { // JsonbFiled is special case res = JsonbField(); } get_finalized_column().get(n, res); return; } size_t ind = n; if (ind < num_of_defaults_in_prefix) { res = least_common_type.get()->get_default(); return; } ind -= num_of_defaults_in_prefix; for (size_t i = 0; i < data.size(); ++i) { const auto& part = data[i]; const auto& part_type = data_types[i]; if (ind < part->size()) { res = vectorized::remove_nullable(part_type)->get_default(); part->get(ind, res); Field new_field; convert_field_to_type(res, *least_common_type.get(), &new_field); res = new_field; return; } ind -= part->size(); } throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Index ({}) for getting field is out of range", n); } Field ColumnObject::operator[](size_t n) const { Field object; get(n, object); return object; } void ColumnObject::get(size_t n, Field& res) const { if (UNLIKELY(n >= size())) { throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Index ({}) for getting field is out of range for size {}", n, size()); } res = VariantMap(); auto& object = res.get<VariantMap&>(); for (const auto& entry : subcolumns) { Field field; entry->data.get(n, field); // Notice: we treat null as empty field, since we do not distinguish null and empty for Variant type. if (field.get_type() != Field::Types::Null) { object.try_emplace(entry->path.get_path(), field); } } if (object.empty()) { res = Null(); } } void ColumnObject::add_nested_subcolumn(const PathInData& key, const FieldInfo& field_info, size_t new_size) { if (!key.has_nested_part()) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Cannot add Nested subcolumn, because path doesn't contain Nested"); } bool inserted = false; /// We find node that represents the same Nested type as @key. const auto* nested_node = subcolumns.find_best_match(key); // TODO a better way to handle following case: // {"a" : {"b" : [{"x" : 10}]}} // {"a" : {"b" : {"c": [{"y" : 10}]}}} // maybe a.b.c.y should not follow from a.b's nested data // If we have a nested subcolumn and it contains nested node in it's path if (nested_node && Subcolumns::find_parent(nested_node, [](const auto& node) { return node.is_nested(); })) { /// Find any leaf of Nested subcolumn. const auto* leaf = Subcolumns::find_leaf(nested_node, [&](const auto&) { return true; }); assert(leaf); /// Recreate subcolumn with default values and the same sizes of arrays. auto new_subcolumn = leaf->data.clone_with_default_values(field_info); /// It's possible that we have already inserted value from current row /// to this subcolumn. So, adjust size to expected. if (new_subcolumn.size() > new_size) { new_subcolumn.pop_back(new_subcolumn.size() - new_size); } assert(new_subcolumn.size() == new_size); inserted = subcolumns.add(key, new_subcolumn); } else { /// If node was not found just add subcolumn with empty arrays. inserted = subcolumns.add(key, Subcolumn(new_size, is_nullable)); } if (!inserted) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Subcolumn '{}' already exists", key.get_path()); } if (num_rows == 0) { num_rows = new_size; } else if (new_size != num_rows) { throw doris::Exception( ErrorCode::INTERNAL_ERROR, "Required size of subcolumn {} ({}) is inconsistent with column size ({})", key.get_path(), new_size, num_rows); } } void ColumnObject::insert_range_from(const IColumn& src, size_t start, size_t length) { #ifndef NDEBUG check_consistency(); #endif const auto& src_object = assert_cast<const ColumnObject&>(src); for (const auto& entry : src_object.subcolumns) { if (!has_subcolumn(entry->path)) { if (entry->path.has_nested_part()) { FieldInfo field_info { .scalar_type_id = entry->data.least_common_type.get_base_type_id(), .have_nulls = false, .need_convert = false, .num_dimensions = entry->data.get_dimensions()}; add_nested_subcolumn(entry->path, field_info, num_rows); } else { add_sub_column(entry->path, num_rows); } } auto* subcolumn = get_subcolumn(entry->path); subcolumn->insert_range_from(entry->data, start, length); } for (auto& entry : subcolumns) { if (!src_object.has_subcolumn(entry->path)) { bool inserted = try_insert_many_defaults_from_nested(entry); if (!inserted) { entry->data.insert_many_defaults(length); } } } num_rows += length; finalize(FinalizeMode::READ_MODE); #ifndef NDEBUG check_consistency(); #endif } ColumnPtr ColumnObject::replicate(const Offsets& offsets) const { if (num_rows == 0 || subcolumns.empty()) { // Add an emtpy column with offsets.back rows auto res = ColumnObject::create(true, false); res->set_num_rows(offsets.back()); } return apply_for_subcolumns( [&](const auto& subcolumn) { return subcolumn.replicate(offsets); }); } ColumnPtr ColumnObject::permute(const Permutation& perm, size_t limit) const { if (num_rows == 0 || subcolumns.empty()) { if (limit == 0) { limit = num_rows; } else { limit = std::min(num_rows, limit); } if (perm.size() < limit) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Size of permutation is less than required."); } // Add an emtpy column with limit rows auto res = ColumnObject::create(true, false); res->set_num_rows(limit); return res; } return apply_for_subcolumns( [&](const auto& subcolumn) { return subcolumn.permute(perm, limit); }); } void ColumnObject::pop_back(size_t length) { for (auto& entry : subcolumns) { entry->data.pop_back(length); } num_rows -= length; } const ColumnObject::Subcolumn* ColumnObject::get_subcolumn(const PathInData& key) const { const auto* node = subcolumns.find_leaf(key); if (node == nullptr) { VLOG_DEBUG << "There is no subcolumn " << key.get_path(); return nullptr; } return &node->data; } const ColumnObject::Subcolumn* ColumnObject::get_subcolumn_with_cache(const PathInData& key, size_t key_index) const { // Optimization by caching the order of fields (which is almost always the same) // and a quick check to match the next expected field, instead of searching the hash table. if (_prev_positions.size() > key_index && _prev_positions[key_index].second != nullptr && key == _prev_positions[key_index].first) { return _prev_positions[key_index].second; } const auto* subcolumn = get_subcolumn(key); if (key_index >= _prev_positions.size()) { _prev_positions.resize(key_index + 1); } if (subcolumn != nullptr) { _prev_positions[key_index] = std::make_pair(key, subcolumn); } return subcolumn; } ColumnObject::Subcolumn* ColumnObject::get_subcolumn(const PathInData& key, size_t key_index) { return const_cast<ColumnObject::Subcolumn*>(get_subcolumn_with_cache(key, key_index)); } const ColumnObject::Subcolumn* ColumnObject::get_subcolumn(const PathInData& key, size_t key_index) const { return get_subcolumn_with_cache(key, key_index); } ColumnObject::Subcolumn* ColumnObject::get_subcolumn(const PathInData& key) { const auto* node = subcolumns.find_leaf(key); if (node == nullptr) { VLOG_DEBUG << "There is no subcolumn " << key.get_path(); return nullptr; } return &const_cast<Subcolumns::Node*>(node)->data; } bool ColumnObject::has_subcolumn(const PathInData& key) const { return subcolumns.find_leaf(key) != nullptr; } bool ColumnObject::add_sub_column(const PathInData& key, MutableColumnPtr&& subcolumn, DataTypePtr type) { size_t new_size = subcolumn->size(); doc_structure = nullptr; if (key.empty() && subcolumns.empty()) { // create root subcolumns.create_root(Subcolumn(std::move(subcolumn), type, is_nullable, true)); num_rows = new_size; return true; } if (key.empty() && ((!subcolumns.get_root()->is_scalar()) || is_nothing(subcolumns.get_root()->data.get_least_common_type()))) { bool root_it_scalar = subcolumns.get_root()->is_scalar(); // update root to scalar subcolumns.get_mutable_root()->modify_to_scalar( Subcolumn(std::move(subcolumn), type, is_nullable, true)); if (!root_it_scalar) { subcolumns.add_leaf(subcolumns.get_root_ptr()); } if (num_rows == 0) { num_rows = new_size; } return true; } bool inserted = subcolumns.add(key, Subcolumn(std::move(subcolumn), type, is_nullable)); if (!inserted) { VLOG_DEBUG << "Duplicated sub column " << key.get_path(); return false; } if (num_rows == 0) { num_rows = new_size; } else if (new_size != num_rows) { VLOG_DEBUG << "Size of subcolumn is in consistent with column"; return false; } return true; } bool ColumnObject::add_sub_column(const PathInData& key, size_t new_size) { if (key.empty() && subcolumns.empty()) { // create root subcolumns.create_root(Subcolumn(new_size, is_nullable, true)); num_rows = new_size; return true; } if (key.empty() && (!subcolumns.get_root()->is_scalar())) { // update none scalar root column to scalar node subcolumns.get_mutable_root()->modify_to_scalar(Subcolumn(new_size, is_nullable, true)); subcolumns.add_leaf(subcolumns.get_root_ptr()); if (num_rows == 0) { num_rows = new_size; } return true; } doc_structure = nullptr; bool inserted = subcolumns.add(key, Subcolumn(new_size, is_nullable)); if (!inserted) { VLOG_DEBUG << "Duplicated sub column " << key.get_path(); return false; } if (num_rows == 0) { num_rows = new_size; } else if (new_size != num_rows) { VLOG_DEBUG << "Size of subcolumn is in consistent with column"; return false; } return true; } PathsInData ColumnObject::getKeys() const { PathsInData keys; keys.reserve(subcolumns.size()); for (const auto& entry : subcolumns) { keys.emplace_back(entry->path); } return keys; } bool ColumnObject::is_finalized() const { return std::all_of(subcolumns.begin(), subcolumns.end(), [](const auto& entry) { return entry->data.is_finalized(); }); } void ColumnObject::Subcolumn::wrapp_array_nullable() { // Wrap array with nullable, treat empty array as null to elimate conflict at present auto& result_column = get_finalized_column_ptr(); if (is_column<vectorized::ColumnArray>(result_column.get()) && !result_column->is_nullable()) { auto new_null_map = ColumnUInt8::create(); new_null_map->reserve(result_column->size()); auto& null_map_data = new_null_map->get_data(); const auto* array = static_cast<const ColumnArray*>(result_column.get()); for (size_t i = 0; i < array->size(); ++i) { null_map_data.push_back(array->is_default_at(i)); } result_column = ColumnNullable::create(std::move(result_column), std::move(new_null_map)); data_types[0] = make_nullable(data_types[0]); least_common_type = LeastCommonType {data_types[0]}; } } rapidjson::Value* find_leaf_node_by_path(rapidjson::Value& json, const PathInData& path, int idx = 0) { if (idx >= path.get_parts().size()) { return &json; } std::string_view current_key = path.get_parts()[idx].key; if (!json.IsObject()) { return nullptr; } /*! RapidJSON uses 32-bit array/string indices even on 64-bit platforms, instead of using \c size_t. Users may override the SizeType by defining \ref RAPIDJSON_NO_SIZETYPEDEFINE. */ rapidjson::Value name(current_key.data(), cast_set<unsigned>(current_key.size())); auto it = json.FindMember(name); if (it == json.MemberEnd()) { return nullptr; } rapidjson::Value& current = it->value; // if (idx == path.get_parts().size() - 1) { // return &current; // } return find_leaf_node_by_path(current, path, idx + 1); } // skip empty json: // 1. null value as empty json, todo: think a better way to disinguish empty json and null json. // 2. nested array with only nulls, eg. [null. null],todo: think a better way to deal distinguish array null value and real null value. // 3. empty root jsonb value(not null) // 4. type is nothing bool skip_empty_json(const ColumnNullable* nullable, const DataTypePtr& type, TypeIndex base_type_id, size_t row, const PathInData& path) { // skip nulls if (nullable && nullable->is_null_at(row)) { return true; } // check if it is empty nested json array, then skip if (base_type_id == TypeIndex::VARIANT && type->equals(*ColumnObject::NESTED_TYPE)) { Field field = (*nullable)[row]; if (field.get_type() == Field::Types::Array) { const auto& array = field.get<Array>(); bool only_nulls_inside = true; for (const auto& elem : array) { if (elem.get_type() != Field::Types::Null) { only_nulls_inside = false; break; } } // if only nulls then skip return only_nulls_inside; } } // skip empty jsonb value if ((path.empty() && nullable && nullable->get_data_at(row).empty())) { return true; } // skip nothing type if (base_type_id == TypeIndex::Nothing) { return true; } return false; } Status find_and_set_leave_value(const IColumn* column, const PathInData& path, const DataTypeSerDeSPtr& type_serde, const DataTypePtr& type, TypeIndex base_type_index, rapidjson::Value& root, rapidjson::Document::AllocatorType& allocator, Arena& mem_pool, size_t row) { #ifndef NDEBUG // sanitize type and column if (column->get_name() != type->create_column()->get_name()) { return Status::InternalError( "failed to set value for path {}, expected type {}, but got {} at row {}", path.get_path(), type->get_name(), column->get_name(), row); } #endif const auto* nullable = check_and_get_column<ColumnNullable>(column); if (skip_empty_json(nullable, type, base_type_index, row, path)) { return Status::OK(); } // TODO could cache the result of leaf nodes with it's path info rapidjson::Value* target = find_leaf_node_by_path(root, path); if (UNLIKELY(!target)) { rapidjson::StringBuffer buffer; rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); root.Accept(writer); LOG(WARNING) << "could not find path " << path.get_path() << ", root: " << std::string(buffer.GetString(), buffer.GetSize()); return Status::NotFound("Not found path {}", path.get_path()); } RETURN_IF_ERROR(type_serde->write_one_cell_to_json(*column, *target, allocator, mem_pool, row)); return Status::OK(); } // compact null values // {"a" : {"b" : "d" {"n" : null}, "e" : null}, "c" : 10 } // after compact -> {"a" : {"c"} : 10} void compact_null_values(rapidjson::Value& json, rapidjson::Document::AllocatorType& allocator) { if (!json.IsObject() || json.IsNull()) { return; } rapidjson::Value::MemberIterator it = json.MemberBegin(); while (it != json.MemberEnd()) { rapidjson::Value& value = it->value; if (value.IsNull()) { it = json.EraseMember(it); continue; } compact_null_values(value, allocator); if (value.IsObject() && value.ObjectEmpty()) { it = json.EraseMember(it); continue; } ++it; } } // Construct rapidjson value from Subcolumns void get_json_by_column_tree(rapidjson::Value& root, rapidjson::Document::AllocatorType& allocator, const ColumnObject::Subcolumns::Node* node_root) { if (node_root == nullptr || node_root->children.empty()) { root.SetNull(); return; } root.SetObject(); // sort to make output stable std::vector<StringRef> sorted_keys = node_root->get_sorted_chilren_keys(); for (const StringRef& key : sorted_keys) { rapidjson::Value value(rapidjson::kObjectType); get_json_by_column_tree(value, allocator, node_root->get_child_node(key).get()); root.AddMember(rapidjson::StringRef(key.data, key.size), value, allocator); } } Status ColumnObject::serialize_one_row_to_string(size_t row, std::string* output) const { if (!is_finalized()) { const_cast<ColumnObject*>(this)->finalize(FinalizeMode::READ_MODE); } rapidjson::StringBuffer buf; if (is_scalar_variant()) { auto type = get_root_type(); *output = type->to_string(*get_root(), row); return Status::OK(); } RETURN_IF_ERROR(serialize_one_row_to_json_format(row, &buf, nullptr)); // TODO avoid copy *output = std::string(buf.GetString(), buf.GetSize()); return Status::OK(); } Status ColumnObject::serialize_one_row_to_string(size_t row, BufferWritable& output) const { if (!is_finalized()) { const_cast<ColumnObject*>(this)->finalize(FinalizeMode::READ_MODE); } if (is_scalar_variant()) { auto type = get_root_type(); type->to_string(*get_root(), row, output); return Status::OK(); } rapidjson::StringBuffer buf; RETURN_IF_ERROR(serialize_one_row_to_json_format(row, &buf, nullptr)); output.write(buf.GetString(), buf.GetLength()); return Status::OK(); } Status ColumnObject::serialize_one_row_to_json_format(size_t row, rapidjson::StringBuffer* output, bool* is_null) const { CHECK(is_finalized()); if (subcolumns.empty()) { if (is_null != nullptr) { *is_null = true; } else { rapidjson::Value root(rapidjson::kNullType); rapidjson::Writer<rapidjson::StringBuffer> writer(*output); if (!root.Accept(writer)) { return Status::InternalError("Failed to serialize json value"); } } return Status::OK(); } CHECK(size() > row); rapidjson::StringBuffer buffer; rapidjson::Value root(rapidjson::kNullType); if (doc_structure == nullptr) { doc_structure = std::make_shared<rapidjson::Document>(); rapidjson::Document::AllocatorType& allocator = doc_structure->GetAllocator(); get_json_by_column_tree(*doc_structure, allocator, subcolumns.get_root()); } if (!doc_structure->IsNull()) { root.CopyFrom(*doc_structure, doc_structure->GetAllocator()); } Arena mem_pool; bool serialize_root = true; // Assume all subcolumns are null by default for (const auto& subcolumn : subcolumns) { if (subcolumn->data.is_root) { continue; // Skip the root column } // If any non-root subcolumn is NOT null, set serialize_root to false and exit early if (!assert_cast<const ColumnNullable&, TypeCheckOnRelease::DISABLE>( *subcolumn->data.get_finalized_column_ptr()) .is_null_at(row)) { serialize_root = false; break; } } #ifndef NDEBUG VLOG_DEBUG << "dump structure " << JsonFunctions::print_json_value(*doc_structure); #endif if (serialize_root && subcolumns.get_root()->is_scalar()) { // only serialize root when all other subcolumns is null RETURN_IF_ERROR( subcolumns.get_root()->data.get_least_common_type_serde()->write_one_cell_to_json( subcolumns.get_root()->data.get_finalized_column(), root, doc_structure->GetAllocator(), mem_pool, row)); output->Clear(); compact_null_values(root, doc_structure->GetAllocator()); rapidjson::Writer<rapidjson::StringBuffer> writer(*output); root.Accept(writer); return Status::OK(); } // handle subcolumns exclude root node for (const auto& subcolumn : subcolumns) { if (subcolumn->data.is_root) { continue; } RETURN_IF_ERROR(find_and_set_leave_value( subcolumn->data.get_finalized_column_ptr().get(), subcolumn->path, subcolumn->data.get_least_common_type_serde(), subcolumn->data.get_least_common_type(), subcolumn->data.least_common_type.get_base_type_id(), root, doc_structure->GetAllocator(), mem_pool, row)); } compact_null_values(root, doc_structure->GetAllocator()); if (root.IsNull() && is_null != nullptr) { // Fast path *is_null = true; } else { output->Clear(); rapidjson::Writer<rapidjson::StringBuffer> writer(*output); if (!root.Accept(writer)) { return Status::InternalError("Failed to serialize json value"); } } return Status::OK(); } Status ColumnObject::merge_sparse_to_root_column() { CHECK(is_finalized()); if (sparse_columns.empty()) { return Status::OK(); } ColumnPtr src = subcolumns.get_mutable_root()->data.get_finalized_column_ptr(); MutableColumnPtr mresult = src->clone_empty(); const ColumnNullable* src_null = assert_cast<const ColumnNullable*>(src.get()); const ColumnString* src_column_ptr = assert_cast<const ColumnString*>(&src_null->get_nested_column()); rapidjson::StringBuffer buffer; doc_structure = std::make_shared<rapidjson::Document>(); rapidjson::Document::AllocatorType& allocator = doc_structure->GetAllocator(); get_json_by_column_tree(*doc_structure, allocator, sparse_columns.get_root()); #ifndef NDEBUG VLOG_DEBUG << "dump structure " << JsonFunctions::print_json_value(*doc_structure); #endif ColumnNullable* result_column_nullable = assert_cast<ColumnNullable*>(mresult->assume_mutable().get()); ColumnString* result_column_ptr = assert_cast<ColumnString*>(&result_column_nullable->get_nested_column()); result_column_nullable->reserve(num_rows); // parse each row to jsonb for (size_t i = 0; i < num_rows; ++i) { // root is not null, store original value, eg. the root is scalar type like '[1]' if (!src_null->empty() && !src_null->is_null_at(i)) { result_column_ptr->insert_data(src_column_ptr->get_data_at(i).data, src_column_ptr->get_data_at(i).size); result_column_nullable->get_null_map_data().push_back(0); continue; } // parse and encode sparse columns buffer.Clear(); rapidjson::Value root(rapidjson::kNullType); if (!doc_structure->IsNull()) { root.CopyFrom(*doc_structure, doc_structure->GetAllocator()); } size_t null_count = 0; Arena mem_pool; for (const auto& subcolumn : sparse_columns) { auto& column = subcolumn->data.get_finalized_column_ptr(); if (assert_cast<const ColumnNullable&, TypeCheckOnRelease::DISABLE>(*column).is_null_at( i)) { ++null_count; continue; } bool succ = find_and_set_leave_value( column.get(), subcolumn->path, subcolumn->data.get_least_common_type_serde(), subcolumn->data.get_least_common_type(), subcolumn->data.least_common_type.get_base_type_id(), root, doc_structure->GetAllocator(), mem_pool, i); if (succ && subcolumn->path.empty() && !root.IsObject()) { // root was modified, only handle root node break; } } // all null values, store null to sparse root if (null_count == sparse_columns.size()) { result_column_ptr->insert_default(); result_column_nullable->get_null_map_data().push_back(1); continue; } // encode sparse columns into jsonb format compact_null_values(root, doc_structure->GetAllocator()); // parse as jsonb value and put back to rootnode // TODO, we could convert to jsonb directly from rapidjson::Value for better performance, instead of parsing JsonbParser parser; rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); root.Accept(writer); bool res = parser.parse(buffer.GetString(), buffer.GetSize()); if (!res) { return Status::InvalidArgument( "parse json failed, doc: {}" ", row_num:{}" ", error:{}", std::string(buffer.GetString(), buffer.GetSize()), i, JsonbErrMsg::getErrMsg(parser.getErrorCode())); } result_column_ptr->insert_data(parser.getWriter().getOutput()->getBuffer(), parser.getWriter().getOutput()->getSize()); result_column_nullable->get_null_map_data().push_back(0); } subcolumns.get_mutable_root()->data.get_finalized_column().clear(); // assign merged column, do insert_range_from to make a copy, instead of replace the ptr itselft // to make sure the root column ptr is not changed subcolumns.get_mutable_root()->data.get_finalized_column().insert_range_from( *mresult->get_ptr(), 0, num_rows); return Status::OK(); } void ColumnObject::unnest(Subcolumns::NodePtr& entry, Subcolumns& arg_subcolumns) const { entry->data.finalize(); auto nested_column = entry->data.get_finalized_column_ptr()->assume_mutable(); auto* nested_column_nullable = assert_cast<ColumnNullable*>(nested_column.get()); auto* nested_column_array = assert_cast<ColumnArray*>(nested_column_nullable->get_nested_column_ptr().get()); auto& offset = nested_column_array->get_offsets_ptr(); auto* nested_object_nullable = assert_cast<ColumnNullable*>( nested_column_array->get_data_ptr()->assume_mutable().get()); auto& nested_object_column = assert_cast<ColumnObject&>(nested_object_nullable->get_nested_column()); PathInData nested_path = entry->path; for (auto& nested_entry : nested_object_column.subcolumns) { if (nested_entry->data.least_common_type.get_base_type_id() == TypeIndex::Nothing) { continue; } nested_entry->data.finalize(); PathInDataBuilder path_builder; // format nested path path_builder.append(nested_path.get_parts(), false); path_builder.append(nested_entry->path.get_parts(), true); auto subnested_column = ColumnArray::create( ColumnNullable::create(nested_entry->data.get_finalized_column_ptr(), nested_object_nullable->get_null_map_column_ptr()), offset); auto nullable_subnested_column = ColumnNullable::create( subnested_column, nested_column_nullable->get_null_map_column_ptr()); auto type = make_nullable( std::make_shared<DataTypeArray>(nested_entry->data.least_common_type.get())); Subcolumn subcolumn(nullable_subnested_column->assume_mutable(), type, is_nullable); arg_subcolumns.add(path_builder.build(), subcolumn); } } void ColumnObject::finalize(FinalizeMode mode) { Subcolumns new_subcolumns; // finalize root first if (mode == FinalizeMode::WRITE_MODE || !is_null_root()) { new_subcolumns.create_root(subcolumns.get_root()->data); new_subcolumns.get_mutable_root()->data.finalize(mode); } for (auto&& entry : subcolumns) { const auto& least_common_type = entry->data.get_least_common_type(); /// Do not add subcolumns, which consists only from NULLs if (is_nothing(remove_nullable(get_base_type_of_array(least_common_type)))) { continue; } // unnest all nested columns, add them to new_subcolumns if (mode == FinalizeMode::WRITE_MODE && least_common_type->equals(*ColumnObject::NESTED_TYPE)) { unnest(entry, new_subcolumns); continue; } entry->data.finalize(mode); entry->data.wrapp_array_nullable(); if (entry->data.is_root) { continue; } // Check and spilit sparse subcolumns, not support nested array at present if (mode == FinalizeMode::WRITE_MODE && (entry->data.check_if_sparse_column(num_rows)) && !entry->path.has_nested_part()) { // TODO seperate ambiguous path sparse_columns.add(entry->path, entry->data); continue; } new_subcolumns.add(entry->path, entry->data); } std::swap(subcolumns, new_subcolumns); doc_structure = nullptr; _prev_positions.clear(); } void ColumnObject::finalize() { finalize(FinalizeMode::READ_MODE); } void ColumnObject::ensure_root_node_type(const DataTypePtr& expected_root_type) { auto& root = subcolumns.get_mutable_root()->data; if (!root.get_least_common_type()->equals(*expected_root_type)) { // make sure the root type is alawys as expected ColumnPtr casted_column; static_cast<void>( schema_util::cast_column(ColumnWithTypeAndName {root.get_finalized_column_ptr(), root.get_least_common_type(), ""}, expected_root_type, &casted_column)); root.data[0] = casted_column; root.data_types[0] = expected_root_type; root.least_common_type = Subcolumn::LeastCommonType {expected_root_type}; } } bool ColumnObject::empty() const { return subcolumns.empty() || subcolumns.begin()->get()->path.get_path() == COLUMN_NAME_DUMMY; } ColumnPtr get_base_column_of_array(const ColumnPtr& column) { if (const auto* column_array = check_and_get_column<ColumnArray>(column.get())) { return column_array->get_data_ptr(); } return column; } ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const { if (!is_finalized()) { auto finalized = clone_finalized(); auto& finalized_object = assert_cast<ColumnObject&>(*finalized); return finalized_object.filter(filter, count); } if (num_rows == 0 || subcolumns.empty()) { // Add an emtpy column with filtered rows auto res = ColumnObject::create(true, false); res->set_num_rows(count_bytes_in_filter(filter)); return res; } auto new_column = ColumnObject::create(true, false); for (auto& entry : subcolumns) { auto subcolumn = entry->data.get_finalized_column().filter(filter, -1); new_column->add_sub_column(entry->path, subcolumn->assume_mutable(), entry->data.get_least_common_type()); } return new_column; } Status ColumnObject::filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) { if (!is_finalized()) { finalize(); } if (num_rows == 0 || subcolumns.empty()) { assert_cast<ColumnObject*>(col_ptr)->insert_many_defaults(sel_size); return Status::OK(); } auto* res = assert_cast<ColumnObject*>(col_ptr); for (const auto& subcolumn : subcolumns) { auto new_subcolumn = subcolumn->data.get_least_common_type()->create_column(); RETURN_IF_ERROR(subcolumn->data.get_finalized_column().filter_by_selector( sel, sel_size, new_subcolumn.get())); res->add_sub_column(subcolumn->path, new_subcolumn->assume_mutable(), subcolumn->data.get_least_common_type()); } return Status::OK(); } size_t ColumnObject::filter(const Filter& filter) { if (!is_finalized()) { finalize(); } size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); if (count == 0) { for_each_subcolumn([](auto& part) { part->clear(); }); } else { for_each_subcolumn([&](auto& part) { if (part->size() != count) { if (part->is_exclusive()) { const auto result_size = part->filter(filter); if (result_size != count) { throw Exception(ErrorCode::INTERNAL_ERROR, "result_size not euqal with filter_size, result_size={}, " "filter_size={}", result_size, count); } CHECK_EQ(result_size, count); } else { part = part->filter(filter, count); } } }); } num_rows = count; #ifndef NDEBUG check_consistency(); #endif return count; } void ColumnObject::clear_subcolumns_data() { for (auto& entry : subcolumns) { for (auto& part : entry->data.data) { DCHECK_EQ(part->use_count(), 1); (*std::move(part)).clear(); } entry->data.num_of_defaults_in_prefix = 0; } num_rows = 0; } void ColumnObject::clear() { Subcolumns empty; std::swap(empty, subcolumns); num_rows = 0; _prev_positions.clear(); } void ColumnObject::create_root() { auto type = is_nullable ? make_nullable(std::make_shared<MostCommonType>()) : std::make_shared<MostCommonType>(); add_sub_column({}, type->create_column(), type); } void ColumnObject::create_root(const DataTypePtr& type, MutableColumnPtr&& column) { if (num_rows == 0) { num_rows = column->size(); } add_sub_column({}, std::move(column), type); } DataTypePtr ColumnObject::get_most_common_type() const { auto type = is_nullable ? make_nullable(std::make_shared<MostCommonType>()) : std::make_shared<MostCommonType>(); return type; } bool ColumnObject::is_null_root() const { auto* root = subcolumns.get_root(); if (root == nullptr) { return true; } if (root->data.num_of_defaults_in_prefix == 0 && (root->data.data.empty() || is_nothing(root->data.get_least_common_type()))) { return true; } return false; } bool ColumnObject::is_scalar_variant() const { // Only root itself return !is_null_root() && subcolumns.get_leaves().size() == 1 && subcolumns.get_root()->is_scalar(); } const DataTypePtr ColumnObject::NESTED_TYPE = std::make_shared<vectorized::DataTypeNullable>( std::make_shared<vectorized::DataTypeArray>(std::make_shared<vectorized::DataTypeNullable>( std::make_shared<vectorized::DataTypeObject>()))); DataTypePtr ColumnObject::get_root_type() const { return subcolumns.get_root()->data.get_least_common_type(); } #define SANITIZE_ROOT() \ if (is_null_root()) { \ return Status::InternalError("No root column, path {}", path.get_path()); \ } \ if (!WhichDataType(remove_nullable(subcolumns.get_root()->data.get_least_common_type())) \ .is_json()) { \ return Status::InternalError( \ "Root column is not jsonb type but {}, path {}", \ subcolumns.get_root()->data.get_least_common_type()->get_name(), path.get_path()); \ } Status ColumnObject::extract_root(const PathInData& path, MutableColumnPtr& dst) const { SANITIZE_ROOT(); if (!path.empty()) { RETURN_IF_ERROR(schema_util::extract(subcolumns.get_root()->data.get_finalized_column_ptr(), path, dst)); } else { if (!dst) { dst = subcolumns.get_root()->data.get_finalized_column_ptr()->clone_empty(); dst->reserve(num_rows); } dst->insert_range_from(*subcolumns.get_root()->data.get_finalized_column_ptr(), 0, num_rows); } return Status::OK(); } void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) { // optimize when src and this column are scalar variant, since try_insert is inefficiency const auto* src_v = check_and_get_column<ColumnObject>(src); bool src_can_do_quick_insert = src_v != nullptr && src_v->is_scalar_variant() && src_v->is_finalized(); // num_rows == 0 means this column is empty, we not need to check it type if (num_rows == 0 && src_can_do_quick_insert) { // add a new root column, and insert from src root column clear(); add_sub_column({}, src_v->get_root()->clone_empty(), src_v->get_root_type()); get_root()->insert_indices_from(*src_v->get_root(), indices_begin, indices_end); num_rows += indices_end - indices_begin; } else if (src_can_do_quick_insert && is_scalar_variant() && src_v->get_root_type()->equals(*get_root_type())) { get_root()->insert_indices_from(*src_v->get_root(), indices_begin, indices_end); num_rows += indices_end - indices_begin; } else { for (const auto* x = indices_begin; x != indices_end; ++x) { try_insert(src[*x]); } } finalize(); } void ColumnObject::for_each_imutable_subcolumn(ImutableColumnCallback callback) const { if (!is_finalized()) { auto finalized = clone_finalized(); auto& finalized_object = assert_cast<ColumnObject&>(*finalized); finalized_object.for_each_imutable_subcolumn(callback); return; } for (const auto& entry : subcolumns) { for (auto& part : entry->data.data) { callback(*part); } } } bool ColumnObject::is_exclusive() const { bool is_exclusive = IColumn::is_exclusive(); for_each_imutable_subcolumn([&](const auto& subcolumn) { if (!subcolumn.is_exclusive()) { is_exclusive = false; } }); return is_exclusive; } void ColumnObject::update_hash_with_value(size_t n, SipHash& hash) const { for_each_imutable_subcolumn( [&](const auto& subcolumn) { return subcolumn.update_hash_with_value(n, hash); }); } void ColumnObject::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { for_each_imutable_subcolumn([&](const auto& subcolumn) { return subcolumn.update_hashes_with_value(hashes, nullptr); }); } void ColumnObject::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash, const uint8_t* __restrict null_data) const { for_each_imutable_subcolumn([&](const auto& subcolumn) { return subcolumn.update_xxHash_with_value(start, end, hash, nullptr); }); } void ColumnObject::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType type, uint32_t rows, uint32_t offset, const uint8_t* __restrict null_data) const { for_each_imutable_subcolumn([&](const auto& subcolumn) { return subcolumn.update_crcs_with_value(hash, type, rows, offset, nullptr); }); } void ColumnObject::update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const { for_each_imutable_subcolumn([&](const auto& subcolumn) { return subcolumn.update_crc_with_value(start, end, hash, nullptr); }); } std::string ColumnObject::debug_string() const { std::stringstream res; res << get_name() << "(num_row = " << num_rows; for (auto& entry : subcolumns) { if (entry->data.is_finalized()) { res << "[column:" << entry->data.data[0]->dump_structure() << ",type:" << entry->data.data_types[0]->get_name() << ",path:" << entry->path.get_path() << "],"; } } res << ")"; return res.str(); } Status ColumnObject::sanitize() const { #ifndef NDEBUG RETURN_IF_CATCH_EXCEPTION(check_consistency()); for (const auto& subcolumn : subcolumns) { if (subcolumn->data.is_finalized()) { auto column = subcolumn->data.get_least_common_type()->create_column(); std::string original = subcolumn->data.get_finalized_column().get_name(); std::string expected = column->get_name(); if (original != expected) { return Status::InternalError("Incompatible type between {} and {}, debug_info:", original, expected, debug_string()); } } } VLOG_DEBUG << "sanitized " << debug_string(); #endif return Status::OK(); } ColumnObject::Subcolumn ColumnObject::Subcolumn::cut(size_t start, size_t length) const { Subcolumn new_subcolumn(0, is_nullable); new_subcolumn.insert_range_from(*this, start, length); return new_subcolumn; } const ColumnObject::Subcolumns::Node* ColumnObject::get_leaf_of_the_same_nested( const Subcolumns::NodePtr& entry) const { const auto* leaf = subcolumns.get_leaf_of_the_same_nested( entry->path, [&](const Subcolumns::Node& node) { return node.data.size() > entry->data.size(); }); if (leaf && is_nothing(leaf->data.get_least_common_typeBase())) { return nullptr; } return leaf; } bool ColumnObject::try_insert_many_defaults_from_nested(const Subcolumns::NodePtr& entry) const { const auto* leaf = get_leaf_of_the_same_nested(entry); if (!leaf) { return false; } size_t old_size = entry->data.size(); FieldInfo field_info = { .scalar_type_id = entry->data.least_common_type.get_base_type_id(), .have_nulls = false, .need_convert = false, .num_dimensions = entry->data.get_dimensions(), }; /// Cut the needed range from the found leaf /// and replace scalar values to the correct /// default values for given entry. auto new_subcolumn = leaf->data.cut(old_size, leaf->data.size() - old_size) .clone_with_default_values(field_info); entry->data.insert_range_from(new_subcolumn, 0, new_subcolumn.size()); return true; } bool ColumnObject::try_insert_default_from_nested(const Subcolumns::NodePtr& entry) const { const auto* leaf = get_leaf_of_the_same_nested(entry); if (!leaf) { return false; } auto last_field = leaf->data.get_last_field(); if (last_field.is_null()) { return false; } size_t leaf_num_dimensions = leaf->data.get_dimensions(); size_t entry_num_dimensions = entry->data.get_dimensions(); auto default_scalar = entry_num_dimensions > leaf_num_dimensions ? create_empty_array_field(entry_num_dimensions - leaf_num_dimensions) : entry->data.get_least_common_type()->get_default(); auto default_field = apply_visitor( FieldVisitorReplaceScalars(default_scalar, leaf_num_dimensions), last_field); entry->data.insert(std::move(default_field)); return true; } } // namespace doris::vectorized