Status OlapTableBlockConvertor::_internal_validate_column()

in be/src/vec/sink/vtablet_block_convertor.cpp [189:460]


Status OlapTableBlockConvertor::_internal_validate_column(
        RuntimeState* state, const DataTypePtr& type, vectorized::ColumnPtr column,
        size_t slot_index, fmt::memory_buffer& error_prefix, const size_t row_count,
        vectorized::IColumn::Permutation* rows) {
    DCHECK((rows == nullptr) || (rows->size() == row_count));
    fmt::memory_buffer error_msg;
    auto set_invalid_and_append_error_msg = [&](size_t row) {
        _filter_map[row] = true;
        auto ret = state->append_error_msg_to_file([]() -> std::string { return ""; },
                                                   [&error_prefix, &error_msg]() -> std::string {
                                                       return fmt::to_string(error_prefix) +
                                                              fmt::to_string(error_msg);
                                                   });
        error_msg.clear();
        return ret;
    };

    auto column_ptr = vectorized::check_and_get_column<vectorized::ColumnNullable>(*column);
    auto& real_column_ptr = column_ptr == nullptr ? column : (column_ptr->get_nested_column_ptr());
    auto null_map = column_ptr == nullptr ? nullptr : column_ptr->get_null_map_data().data();
    auto need_to_validate = [&null_map, this](size_t j, size_t row) {
        return !_filter_map[row] && (null_map == nullptr || null_map[j] == 0);
    };

    auto string_column_checker = [&](const ColumnString* column_string) {
        int limit = config::string_type_length_soft_limit_bytes;
        int len = -1;
        // when type.len is negative, std::min will return overflow value, so we need to check it
        if (const auto* type_str =
                    check_and_get_data_type<DataTypeString>(remove_nullable(type).get())) {
            if (type_str->len() >= 0) {
                len = type_str->len();
                limit = std::min(limit, type_str->len());
            }
        }

        auto* __restrict offsets = column_string->get_offsets().data();
        int invalid_count = 0;
        for (int64_t j = 0; j < row_count; ++j) {
            invalid_count += (offsets[j] - offsets[j - 1]) > limit;
        }

        if (invalid_count) {
            for (size_t j = 0; j < row_count; ++j) {
                auto row = rows ? (*rows)[j] : j;
                if (need_to_validate(j, row)) {
                    auto str_val = column_string->get_data_at(j);
                    bool invalid = str_val.size > limit;
                    if (invalid) {
                        if (str_val.size > len) {
                            fmt::format_to(error_msg, "{}",
                                           "the length of input is too long than schema. ");
                            fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ",
                                           str_val.to_prefix(32));
                            fmt::format_to(error_msg, "schema length: {}; ", len);
                            fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
                        } else if (str_val.size > limit) {
                            fmt::format_to(
                                    error_msg, "{}",
                                    "the length of input string is too long than vec schema. ");
                            fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ",
                                           str_val.to_prefix(32));
                            fmt::format_to(error_msg, "schema length: {}; ", len);
                            fmt::format_to(error_msg, "limit length: {}; ", limit);
                            fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
                        }
                        RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
                    }
                }
            }
        }
        return Status::OK();
    };

    switch (type->get_primitive_type()) {
    case TYPE_CHAR:
    case TYPE_VARCHAR:
    case TYPE_STRING: {
        const auto column_string =
                assert_cast<const vectorized::ColumnString*>(real_column_ptr.get());
        RETURN_IF_ERROR(string_column_checker(column_string));
        break;
    }
    case TYPE_JSONB: {
        const auto* column_string =
                assert_cast<const vectorized::ColumnString*>(real_column_ptr.get());
        for (size_t j = 0; j < row_count; ++j) {
            if (!_filter_map[j]) {
                if (type->is_nullable() && column_ptr && column_ptr->is_null_at(j)) {
                    continue;
                }
                auto str_val = column_string->get_data_at(j);
                bool invalid = str_val.size == 0;
                if (invalid) {
                    error_msg.clear();
                    fmt::format_to(error_msg, "{}", "jsonb with size 0 is invalid");
                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
                }
            }
        }
        break;
    }
    case TYPE_DECIMALV2: {
        auto* column_decimal = const_cast<vectorized::ColumnDecimal<vectorized::Decimal128V2>*>(
                assert_cast<const vectorized::ColumnDecimal<vectorized::Decimal128V2>*>(
                        real_column_ptr.get()));
        const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type);
        const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type);
        for (size_t j = 0; j < row_count; ++j) {
            auto row = rows ? (*rows)[j] : j;
            if (need_to_validate(j, row)) {
                auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>(
                        column_decimal->get_data()[j]);
                bool invalid = false;

                if (dec_val.greater_than_scale(type->get_scale())) {
                    auto code =
                            dec_val.round(&dec_val, remove_nullable(type)->get_scale(), HALF_UP);
                    column_decimal->get_data()[j] = dec_val.value();

                    if (code != E_DEC_OK) {
                        fmt::format_to(error_msg, "round one decimal failed.value={}; ",
                                       dec_val.to_string());
                        invalid = true;
                    }
                }
                if (dec_val > max_decimalv2 || dec_val < min_decimalv2) {
                    fmt::format_to(error_msg, "{}", "decimal value is not valid for definition");
                    fmt::format_to(error_msg, ", value={}", dec_val.to_string());
                    fmt::format_to(error_msg, ", precision={}, scale={}", type->get_precision(),
                                   type->get_scale());
                    fmt::format_to(error_msg, ", min={}, max={}; ", min_decimalv2.to_string(),
                                   max_decimalv2.to_string());
                    invalid = true;
                }

                if (invalid) {
                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
                }
            }
        }
        break;
    }
    case TYPE_DECIMAL32: {
#define CHECK_VALIDATION_FOR_DECIMALV3(DecimalType)                                               \
    auto column_decimal = const_cast<vectorized::ColumnDecimal<DecimalType>*>(                    \
            assert_cast<const vectorized::ColumnDecimal<DecimalType>*>(real_column_ptr.get()));   \
    const auto& max_decimal = _get_decimalv3_min_or_max<DecimalType, false>(type);                \
    const auto& min_decimal = _get_decimalv3_min_or_max<DecimalType, true>(type);                 \
    const auto* __restrict datas = column_decimal->get_data().data();                             \
    int invalid_count = 0;                                                                        \
    for (int j = 0; j < row_count; ++j) {                                                         \
        const auto dec_val = datas[j];                                                            \
        invalid_count += dec_val > max_decimal || dec_val < min_decimal;                          \
    }                                                                                             \
    if (invalid_count) {                                                                          \
        for (size_t j = 0; j < row_count; ++j) {                                                  \
            auto row = rows ? (*rows)[j] : j;                                                     \
            if (need_to_validate(j, row)) {                                                       \
                auto dec_val = column_decimal->get_data()[j];                                     \
                bool invalid = false;                                                             \
                if (dec_val > max_decimal || dec_val < min_decimal) {                             \
                    fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); \
                    fmt::format_to(error_msg, ", value={}", dec_val);                             \
                    fmt::format_to(error_msg, ", precision={}, scale={}", type->get_precision(),  \
                                   type->get_scale());                                            \
                    fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal, max_decimal);    \
                    invalid = true;                                                               \
                }                                                                                 \
                if (invalid) {                                                                    \
                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));                       \
                }                                                                                 \
            }                                                                                     \
        }                                                                                         \
    }
        CHECK_VALIDATION_FOR_DECIMALV3(vectorized::Decimal32);
        break;
    }
    case TYPE_DECIMAL64: {
        CHECK_VALIDATION_FOR_DECIMALV3(vectorized::Decimal64);
        break;
    }
    case TYPE_DECIMAL128I: {
        CHECK_VALIDATION_FOR_DECIMALV3(vectorized::Decimal128V3);
        break;
    }
    case TYPE_DECIMAL256: {
        CHECK_VALIDATION_FOR_DECIMALV3(vectorized::Decimal256);
        break;
    }
#undef CHECK_VALIDATION_FOR_DECIMALV3
    case TYPE_ARRAY: {
        const auto* column_array =
                assert_cast<const vectorized::ColumnArray*>(real_column_ptr.get());
        const auto* type_array =
                assert_cast<const vectorized::DataTypeArray*>(remove_nullable(type).get());
        auto nested_type = type_array->get_nested_type();
        const auto& offsets = column_array->get_offsets();
        vectorized::IColumn::Permutation permutation(offsets.back());
        for (size_t r = 0; r < row_count; ++r) {
            for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
                permutation[c] = rows ? (*rows)[r] : r;
            }
        }
        fmt::format_to(error_prefix, "ARRAY type failed: ");
        RETURN_IF_ERROR(_validate_column(state, nested_type, column_array->get_data_ptr(),
                                         slot_index, error_prefix, permutation.size(),
                                         &permutation));
        break;
    }
    case TYPE_MAP: {
        const auto column_map = assert_cast<const vectorized::ColumnMap*>(real_column_ptr.get());
        const auto* type_map =
                assert_cast<const vectorized::DataTypeMap*>(remove_nullable(type).get());
        auto key_type = type_map->get_key_type();
        auto val_type = type_map->get_value_type();
        const auto& offsets = column_map->get_offsets();
        vectorized::IColumn::Permutation permutation(offsets.back());
        for (size_t r = 0; r < row_count; ++r) {
            for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
                permutation[c] = rows ? (*rows)[r] : r;
            }
        }
        fmt::format_to(error_prefix, "MAP type failed: ");
        RETURN_IF_ERROR(_validate_column(state, key_type, column_map->get_keys_ptr(), slot_index,
                                         error_prefix, permutation.size(), &permutation));
        RETURN_IF_ERROR(_validate_column(state, val_type, column_map->get_values_ptr(), slot_index,
                                         error_prefix, permutation.size(), &permutation));
        break;
    }
    case TYPE_STRUCT: {
        const auto column_struct =
                assert_cast<const vectorized::ColumnStruct*>(real_column_ptr.get());
        const auto* type_struct =
                assert_cast<const vectorized::DataTypeStruct*>(remove_nullable(type).get());
        DCHECK(type_struct->get_elements().size() == column_struct->tuple_size());
        fmt::format_to(error_prefix, "STRUCT type failed: ");
        for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
            RETURN_IF_ERROR(_validate_column(
                    state, type_struct->get_element(sc), column_struct->get_column_ptr(sc),
                    slot_index, error_prefix, column_struct->get_column_ptr(sc)->size()));
        }
        break;
    }
    case TYPE_AGG_STATE: {
        auto* column_string = vectorized::check_and_get_column<ColumnString>(*real_column_ptr);
        if (column_string) {
            RETURN_IF_ERROR(string_column_checker(column_string));
        }
        break;
    }
    default:
        break;
    }

    // Dispose the column should do not contain the NULL value
    // Only two case:
    // 1. column is nullable but the desc is not nullable
    // 2. desc->type is BITMAP
    if ((!type->is_nullable() || type->get_primitive_type() == TYPE_OBJECT) && column_ptr) {
        for (int j = 0; j < row_count; ++j) {
            auto row = rows ? (*rows)[j] : j;
            if (null_map[j] && !_filter_map[row]) {
                fmt::format_to(error_msg, "null value for not null column, type={}",
                               type->get_name());
                RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
            }
        }
    }

    return Status::OK();
}