cpp-ch/local-engine/Functions/SparkFunctionGetJsonObject.h (692 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <cerrno> #include <limits> #include <memory> #include <string> #include <string_view> #include <Columns/ColumnNullable.h> #include <Columns/ColumnTuple.h> #include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeTuple.h> #include <DataTypes/IDataType.h> #include <Functions/FunctionSQLJSON.h> #include <Functions/IFunction.h> #include <Functions/JSONPath/Generator/GeneratorJSONPath.h> #include <Functions/JSONPath/Parsers/ParserJSONPath.h> #include <Interpreters/Context.h> #include <Parsers/IParser.h> #include <Parsers/TokenIterator.h> #include <base/find_symbols.h> #include <base/range.h> #include <Poco/Logger.h> #include <Poco/StringTokenizer.h> #include <Common/Exception.h> #include <Common/JSONParsers/DummyJSONParser.h> #include <Common/JSONParsers/SimdJSONParser.h> #include <Common/StringUtils.h> #include <Common/logger_useful.h> namespace DB { namespace Setting { extern const SettingsBool allow_simdjson; extern const SettingsUInt64 max_parser_depth; extern const SettingsUInt64 max_parser_backtracks; } namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION; extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_COLUMN; extern const int ILLEGAL_TYPE_OF_ARGUMENT; } } namespace local_engine { // We notice that, `get_json_object` have different behavior with `JSON_VALUE/JSON_QUERY`. // - ('{"x":[{"y":1},{"y":2}]}' '$.x[*].y'), `json_value` return only one element, but `get_json_object` return // return a list. // - ('{"x":[{"y":1}]}' '$.x[*].y'), `json_query`'s result is '[1]', // but `get_json_object`'s result is '1' // struct GetJsonObject { static constexpr auto name{"get_json_object"}; }; class JSONTextNormalizer { public: // simd json will fail to parse the json text on some cases, see #7014, #3750, #3337, #5303 // To keep the result same with vanilla, we normalize the json string when simd json fails. // It returns null when normalize the json text fail, otherwise returns a position among `pos` // and `end` which points to the whole json object end. // `dst` refer to a memory buffer that is used to store the normalization result. static const char * normalize(const char * pos, const char * end, char *& dst) { pos = normalizeWhitespace(pos, end, dst); if (!pos || pos >= end) return nullptr; if (*pos == '[') return normalizeArray(pos, end, dst); else if (*pos == '{') return normalizeObject(pos, end, dst); return nullptr; } private: inline static void copyToDst(char *& p, char c) { *p = c; p++; } inline static void copyToDst(char *& p, const char * src, size_t len) { memcpy(p, src, len); p += len; } inline static bool isExpectedChar(char c, const char * pos, const char * end) { return pos && pos < end && *pos == c; } inline static const char * normalizeWhitespace(const char * pos, const char * end, char *& dst) { const auto * start_pos = pos; while (pos && pos < end) { if (isWhitespaceASCII(*pos)) pos++; else break; } if (pos != start_pos) copyToDst(dst, start_pos, pos - start_pos); return pos; } inline static const char * normalizeComma(const char * pos, const char * end, char *& dst) { pos = normalizeWhitespace(pos, end, dst); if (!isExpectedChar(',', pos, end)) [[unlikely]] { // LOG_DEBUG(getLogger("GetJsonObject"), "xxx normalizeComma. not ,"); return nullptr; } pos += 1; copyToDst(dst, ','); return normalizeWhitespace(pos, end, dst); } inline static const char * normalizeColon(const char * pos, const char * end, char *& dst) { pos = normalizeWhitespace(pos, end, dst); if (!isExpectedChar(':', pos, end)) { // LOG_DEBUG(getLogger("GetJsonObject"), "xxx normalizeColon. not :"); return nullptr; } pos += 1; copyToDst(dst, ':'); return normalizeWhitespace(pos, end, dst); } inline static const char * normalizeField(const char * pos, const char * end, char *& dst) { const auto * start_pos = pos; pos = find_first_symbols<',', '}', ']'>(pos, end); if (pos >= end) [[unlikely]] { // LOG_DEBUG(getLogger("GetJsonObject"), "xxx normalizeField. not field"); return nullptr; } if (*start_pos == '"' || *start_pos == '\'') { copyToDst(dst, start_pos, pos - start_pos); } else { // If it's a too large number, replace it with "Infinity". const char * inf_str = "\"\\\"Infinity\\\"\""; size_t inf_str_len = 14; const char * large_e = "308"; const auto * ep = find_first_symbols<'e', 'E'>(start_pos, pos); if (pos - ep < 3) copyToDst(dst, start_pos, pos - start_pos); else if (pos - ep > 4 || (pos - ep == 4 and memcmp(ep + 1, large_e, 3) >= 0)) { if (isTooLargeNumber(start_pos, pos)) { copyToDst(dst, inf_str, inf_str_len); } else { copyToDst(dst, start_pos, pos - start_pos); } } else { copyToDst(dst, start_pos, pos - start_pos); } } return pos; } inline static bool isTooLargeNumber(const char * start, const char * end) { bool res = false; try { double num2 = std::stod(String(start, end)); } catch (const std::invalid_argument & e) { res = false; } catch (const std::out_of_range & e) { res = true; } return res; } inline static const char * normalizeString(const char * pos, const char * end, char *& dst) { const auto * start_pos = pos; if (!isExpectedChar('"', pos, end)) [[unlikely]] { // LOG_DEBUG(getLogger("GetJsonObject"), "xxx normalizeString. not \""); return nullptr; } pos += 1; do { pos = find_first_symbols<'\\', '"'>(pos, end); if (pos != end && *pos == '\\') { // escape charaters. e.g. '\"', '\\' pos += 2; if (pos >= end) return nullptr; } else break; } while (pos != end); pos = find_first_symbols<'"'>(pos, end); if (!isExpectedChar('"', pos, end)) return nullptr; pos += 1; size_t n = 0; for (; start_pos != pos; ++start_pos) { if ((*start_pos >= 0x00 && *start_pos <= 0x1f) || *start_pos == 0x7f) { if (n) { copyToDst(dst, start_pos - n, n); n = 0; } continue; } else { n += 1; } } if (n) copyToDst(dst, start_pos - n, n); return normalizeWhitespace(pos, end, dst); } /// To use simdjson, we need to convert single quotes to double quotes. /// FIXME: It will be OK if we just return a leaf value, but it will have different result for /// returning a object with strings which are wrapped by single quotes. inline static const char * normalizeSingleQuotesString(const char * pos, const char * end, char *& dst) { if (!isExpectedChar('\'', pos, end)) [[unlikely]] { // LOG_DEBUG(getLogger("GetJsonObject"), "xxx normalizeSingleQuotesString. not '"); return nullptr; } pos += 1; const auto * start_pos = pos; copyToDst(dst, '\"'); do { pos = find_first_symbols<'\\', '\''>(pos, end); if (pos < end && *pos == '\\') { // escape charaters. e.g. '\\', '\'' pos += 2; if (pos >= end) return nullptr; } else break; } while (pos != end); pos = find_first_symbols<'\''>(pos, end); if (!isExpectedChar('\'', pos, end)) { // LOG_DEBUG(getLogger("GetJsonObject"), "xxx normalizeSingleQuotesString. not '"); return nullptr; } pos += 1; size_t n = 0; for (; start_pos != pos; ++start_pos) { if ((*start_pos >= 0x00 && *start_pos <= 0x1f) || *start_pos == 0x7f) { if (n) { copyToDst(dst, start_pos - n, n); n = 0; } continue; } else { n += 1; } } if (n && n - 1) copyToDst(dst, start_pos - n, n - 1); copyToDst(dst, '\"'); return normalizeWhitespace(pos, end, dst); } static const char * normalizeArray(const char * pos, const char * end, char *& dst) { if (!isExpectedChar('[', pos, end)) [[unlikely]] { // LOG_DEBUG(getLogger("GetJsonObject"), "xxx normalizeArray. not ["); return nullptr; } pos += 1; copyToDst(dst, '['); pos = normalizeWhitespace(pos, end, dst); bool has_more = false; while (pos && pos < end && *pos != ']') { has_more = false; switch (*pos) { case '{': { pos = normalizeObject(pos, end, dst); break; } case '"': { pos = normalizeString(pos, end, dst); break; } case '\'': { pos = normalizeSingleQuotesString(pos, end, dst); break; } case '[': { pos = normalizeArray(pos, end, dst); break; } default: { pos = normalizeField(pos, end, dst); break; } } if (!isExpectedChar(',', pos, end)) break; pos = normalizeComma(pos, end, dst); has_more = true; } if (!isExpectedChar(']', pos, end) || has_more) { // LOG_DEBUG(getLogger("GetJsonObject"), "xxx normalizeArray. not ]"); return nullptr; } pos += 1; copyToDst(dst, ']'); return normalizeWhitespace(pos, end, dst); } static const char * normalizeObject(const char * pos, const char * end, char *& dst) { if (!isExpectedChar('{', pos, end)) [[unlikely]] { // LOG_DEBUG(getLogger("GetJsonObject"), "xxx normalizeObject. not object start"); return nullptr; } pos += 1; copyToDst(dst, '{'); bool has_more = false; while (pos && pos < end && *pos != '}') { has_more = false; pos = normalizeWhitespace(pos, end, dst); if (pos != end) { if (*pos == '\'') pos = normalizeSingleQuotesString(pos, end, dst); else if (*pos == '"') pos = normalizeString(pos, end, dst); else if (*pos == '}') continue; else return nullptr; } pos = normalizeColon(pos, end, dst); if (!pos) { // LOG_DEBUG(getLogger("GetJsonObject"), "xxx normalizeObject. not :"); break; } switch (*pos) { case '{': { pos = normalizeObject(pos, end, dst); break; } case '"': { pos = normalizeString(pos, end, dst); break; } case '\'': { pos = normalizeSingleQuotesString(pos, end, dst); break; } case '[': { pos = normalizeArray(pos, end, dst); break; } default: { pos = normalizeField(pos, end, dst); break; } } if (!isExpectedChar(',', pos, end)) break; pos = normalizeComma(pos, end, dst); has_more = true; } if (!isExpectedChar('}', pos, end) || has_more) { // LOG_DEBUG(getLogger("GetJsonObject"), "xxx normalizeObject. not object end"); return nullptr; } pos += 1; copyToDst(dst, '}'); return normalizeWhitespace(pos, end, dst); } }; template <typename JSONParser, typename JSONStringSerializer> class GetJsonObjectImpl { public: using Element = typename JSONParser::Element; static DB::DataTypePtr getReturnType(const char *, const DB::ColumnsWithTypeAndName &, bool) { auto nested_type = std::make_shared<DB::DataTypeString>(); return std::make_shared<DB::DataTypeNullable>(nested_type); } static size_t getNumberOfIndexArguments(const DB::ColumnsWithTypeAndName & arguments) { return arguments.size() - 1; } bool insertResultToColumn(DB::IColumn & dest, const Element & root, DB::GeneratorJSONPath<JSONParser> & generator_json_path, bool path_has_asterisk) { Element current_element = root; DB::VisitorStatus status; std::stringstream out; // STYLE_CHECK_ALLOW_STD_STRING_STREAM /// Create json array of results: [res1, res2, ...] bool success = false; std::vector<Element> elements; while ((status = generator_json_path.getNextItem(current_element)) != DB::VisitorStatus::Exhausted) { if (status == DB::VisitorStatus::Ok) { success = true; elements.push_back(current_element); } else if (status == DB::VisitorStatus::Error) { /// ON ERROR /// Here it is possible to handle errors with ON ERROR (as described in ISO/IEC TR 19075-6), /// however this functionality is not implemented yet } current_element = root; } if (!success) { return false; } DB::ColumnNullable & nullable_col_str = assert_cast<DB::ColumnNullable &>(dest); DB::ColumnString * col_str = assert_cast<DB::ColumnString *>(&nullable_col_str.getNestedColumn()); JSONStringSerializer serializer(*col_str); if (elements.size() == 1) [[likely]] { if (elements[0].isNull()) return false; nullable_col_str.getNullMapData().push_back(0); if (elements[0].isString()) { auto str = elements[0].getString(); if (path_has_asterisk) { str = "\"" + std::string(str) + "\""; } serializer.addRawString(str); } else { serializer.addElement(elements[0]); } } else { const char * array_begin = "["; const char * array_end = "]"; const char * comma = ","; bool flag = false; serializer.addRawData(array_begin, 1); nullable_col_str.getNullMapData().push_back(0); for (auto & element : elements) { if (flag) { serializer.addRawData(comma, 1); } serializer.addElement(element); flag = true; } serializer.addRawData(array_end, 1); } serializer.commit(); return true; } }; /// CH uses the lexer to parse the json path, it's not a good idea. /// If a json field containt spaces, we wrap it by double quotes. /// FIXME: If it contains \t, \n, simdjson cannot parse. class JSONPathNormalizer { public: static String normalize(const String & json_path_) { DB::Tokens tokens(json_path_.data(), json_path_.data() + json_path_.size()); DB::IParser::Pos iter(tokens, 0, 0); String res; while (iter->type != DB::TokenType::EndOfStream) { if (isSubPathBegin(iter)) { if (iter->type == DB::TokenType::Number) { normalizeOnNumber(iter, res); } else { // It may begins with '=', '==' and so on. res += "."; ++iter; normalizeOnBareWord(iter, res); } } else normalizeOnOtherTokens(iter, res); } return res; } private: static std::pair<DB::TokenType, StringRef> prevToken(DB::IParser::Pos & iter, size_t n = 1); static std::pair<DB::TokenType, StringRef> nextToken(DB::IParser::Pos & iter, size_t n = 1); static bool isSubPathBegin(DB::IParser::Pos & iter); static void normalizeOnNumber(DB::IParser::Pos & iter, String & res); static void normalizeOnBareWord(DB::IParser::Pos & iter, String & res); static void normalizeOnOtherTokens(DB::IParser::Pos & iter, String & res); }; /// Flatten a json string into a tuple. /// Not use JSONExtract here, since the json path is a complicated expression. class FlattenJSONStringOnRequiredFunction : public DB::IFunction { public: static constexpr auto name = "flattenJSONStringOnRequired"; static DB::FunctionPtr create(const DB::ContextPtr & context) { return std::make_shared<FlattenJSONStringOnRequiredFunction>(context); } explicit FlattenJSONStringOnRequiredFunction(DB::ContextPtr context_) : context(context_) { } ~FlattenJSONStringOnRequiredFunction() override = default; String getName() const override { return name; } size_t getNumberOfArguments() const override { return 2; } bool isVariadic() const override { return false; } bool isSuitableForShortCircuitArgumentsExecution(const DB::DataTypesWithConstInfo & /*arguments*/) const override { return false; } DB::DataTypePtr getReturnTypeImpl(const DB::ColumnsWithTypeAndName & arguments) const override { String json_fields; if (const auto * json_fields_col = typeid_cast<const DB::ColumnConst *>(arguments[1].column.get())) { json_fields = json_fields_col->getDataAt(0).toString(); } else { throw DB::Exception( DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The second argument of function {} must be a non-constant column", getName()); } Poco::StringTokenizer tokenizer(json_fields, "|"); std::vector<String> names; DB::DataTypes types; DB::DataTypePtr str_type = std::make_shared<DB::DataTypeString>(); str_type = DB::makeNullable(str_type); for (const auto & field : tokenizer) { names.push_back(field); types.push_back(str_type); } return std::make_shared<DB::DataTypeTuple>(types, names); } /// The second argument is required json fields sperated by '|'. DB::ColumnPtr executeImpl( const DB::ColumnsWithTypeAndName & arguments, const DB::DataTypePtr & /*result_type*/, size_t /*input_rows_count*/) const override { #if USE_SIMDJSON if (context->getSettingsRef()[DB::Setting::allow_simdjson]) { return innerExecuteImpl< DB::SimdJSONParser, GetJsonObjectImpl<DB::SimdJSONParser, DB::JSONStringSerializer<DB::SimdJSONParser::Element, DB::SimdJSONElementFormatter>>>( arguments); } #endif return innerExecuteImpl< DB::DummyJSONParser, GetJsonObjectImpl<DB::DummyJSONParser, DB::DefaultJSONStringSerializer<DB::DummyJSONParser::Element>>>(arguments); } private: DB::ContextPtr context; /// If too many rows cannot be parsed by simdjson directly, we will normalize the json text at first; mutable bool is_most_normal_json_text = true; mutable size_t total_parsed_rows = 0; mutable size_t total_normalized_rows = 0; template <typename JSONParser> bool safeParseJson(std::string_view str, JSONParser & parser, JSONParser::Element & doc) const { total_parsed_rows++; if (total_parsed_rows > 10000 && total_normalized_rows * 100 / total_parsed_rows > 90) { is_most_normal_json_text = false; } bool is_doc_ok = false; if (is_most_normal_json_text) { is_doc_ok = parser.parse(str, doc); } if (!is_doc_ok && str.size() > 0) { total_normalized_rows++; std::vector<char> buf; buf.resize(str.size(), 0); char * buf_pos = buf.data(); const char * pos = JSONTextNormalizer::normalize(str.data(), str.data() + str.size(), buf_pos); if (pos) { std::string n_str(buf.data(), buf_pos - buf.data()); // LOG_DEBUG(getLogger("GetJsonObject"), "xxx normalize {} to {}", str, n_str); is_doc_ok = parser.parse(n_str, doc); } } return is_doc_ok; } template <typename JSONParser, typename Impl> DB::ColumnPtr innerExecuteImpl(const DB::ColumnsWithTypeAndName & arguments) const { DB::DataTypePtr str_type = std::make_shared<DB::DataTypeString>(); str_type = DB::makeNullable(str_type); DB::MutableColumns tuple_columns; std::vector<DB::ASTPtr> json_path_asts; std::vector<String> required_fields; std::vector<bool> path_has_asterisk; const auto & first_column = arguments[0]; if (const auto * required_fields_col = typeid_cast<const DB::ColumnConst *>(arguments[1].column.get())) { std::string json_fields = required_fields_col->getDataAt(0).toString(); Poco::StringTokenizer tokenizer(json_fields, "|"); bool path_parsed = true; for (const auto & field : tokenizer) { auto normalized_field = JSONPathNormalizer::normalize(field); // LOG_ERROR(getLogger("JSONPatch"), "xxx field {} -> {}", field, normalized_field); if(normalized_field.find("[*]") != std::string::npos) path_has_asterisk.emplace_back(true); else path_has_asterisk.emplace_back(false); required_fields.push_back(normalized_field); tuple_columns.emplace_back(str_type->createColumn()); const char * query_begin = reinterpret_cast<const char *>(required_fields.back().c_str()); const char * query_end = required_fields.back().c_str() + required_fields.back().size(); DB::Tokens tokens(query_begin, query_end); UInt32 max_parser_depth = static_cast<UInt32>(context->getSettingsRef()[DB::Setting::max_parser_depth]); UInt32 max_parser_backtracks = static_cast<UInt32>(context->getSettingsRef()[DB::Setting::max_parser_backtracks]); DB::IParser::Pos token_iterator(tokens, max_parser_depth, max_parser_backtracks); DB::ASTPtr json_path_ast; DB::ParserJSONPath path_parser; DB::Expected expected; if (!path_parser.parse(token_iterator, json_path_ast, expected)) { path_parsed = false; } json_path_asts.push_back(json_path_ast); } if (!path_parsed) { for (size_t i = 0; i < first_column.column->size(); ++i) { for (size_t j = 0; j < tuple_columns.size(); ++j) tuple_columns[j]->insertDefault(); } return DB::ColumnTuple::create(std::move(tuple_columns)); } } else { throw DB::Exception( DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The second argument of function {} must be a non-constant column", getName()); } if (!isString(first_column.type)) throw DB::Exception( DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The first argument of function {} should be a string containing JSON, illegal type: " "{}", String(name), first_column.type->getName()); const DB::ColumnPtr & arg_json = first_column.column; const auto * col_json_const = typeid_cast<const DB::ColumnConst *>(arg_json.get()); const auto * col_json_string = typeid_cast<const DB::ColumnString *>(col_json_const ? col_json_const->getDataColumnPtr().get() : arg_json.get()); if (!col_json_string) throw DB::Exception(DB::ErrorCodes::ILLEGAL_COLUMN, "Illegal column {}", arg_json->getName()); const DB::ColumnString::Chars & chars = col_json_string->getChars(); const DB::ColumnString::Offsets & offsets = col_json_string->getOffsets(); Impl impl; JSONParser parser; using Element = typename JSONParser::Element; Element document; bool document_ok = false; if (col_json_const) { std::string_view json{reinterpret_cast<const char *>(chars.data()), offsets[0] - 1}; document_ok = safeParseJson(json, parser, document); } size_t tuple_size = tuple_columns.size(); std::vector<std::shared_ptr<DB::GeneratorJSONPath<JSONParser>>> generator_json_paths; std::transform( json_path_asts.begin(), json_path_asts.end(), std::back_inserter(generator_json_paths), [](const auto & ast) { return std::make_shared<DB::GeneratorJSONPath<JSONParser>>(ast); }); for (const auto i : collections::range(0, arguments[0].column->size())) { if (!col_json_const) { std::string_view json{reinterpret_cast<const char *>(&chars[offsets[i - 1]]), offsets[i] - offsets[i - 1] - 1}; document_ok = safeParseJson(json, parser, document); } if (document_ok) { for (size_t j = 0; j < tuple_size; ++j) { generator_json_paths[j]->reinitialize(); if (!impl.insertResultToColumn(*tuple_columns[j], document, *generator_json_paths[j], path_has_asterisk[j])) { tuple_columns[j]->insertDefault(); } } } else { for (size_t j = 0; j < tuple_size; ++j) { tuple_columns[j]->insertDefault(); } } } return DB::ColumnTuple::create(std::move(tuple_columns)); } }; }