in cpp-ch/local-engine/Functions/SparkFunctionGetJsonObject.h [683:805]
DB::ColumnPtr innerExecuteImpl(const DB::ColumnsWithTypeAndName & arguments) const
{
DB::DataTypePtr str_type = std::make_shared<DB::DataTypeString>();
str_type = DB::makeNullable(str_type);
DB::MutableColumns tuple_columns;
std::vector<DB::ASTPtr> json_path_asts;
std::vector<String> required_fields;
std::vector<bool> path_has_asterisk;
const auto & first_column = arguments[0];
if (const auto * required_fields_col = typeid_cast<const DB::ColumnConst *>(arguments[1].column.get()))
{
std::string json_fields = required_fields_col->getDataAt(0).toString();
Poco::StringTokenizer tokenizer(json_fields, "|");
bool path_parsed = true;
for (const auto & field : tokenizer)
{
auto normalized_field = JSONPathNormalizer::normalize(field);
// LOG_ERROR(getLogger("JSONPatch"), "xxx field {} -> {}", field, normalized_field);
if(normalized_field.find("[*]") != std::string::npos)
path_has_asterisk.emplace_back(true);
else
path_has_asterisk.emplace_back(false);
required_fields.push_back(normalized_field);
tuple_columns.emplace_back(str_type->createColumn());
const char * query_begin = reinterpret_cast<const char *>(required_fields.back().c_str());
const char * query_end = required_fields.back().c_str() + required_fields.back().size();
DB::Tokens tokens(query_begin, query_end);
UInt32 max_parser_depth = static_cast<UInt32>(context->getSettingsRef()[DB::Setting::max_parser_depth]);
UInt32 max_parser_backtracks = static_cast<UInt32>(context->getSettingsRef()[DB::Setting::max_parser_backtracks]);
DB::IParser::Pos token_iterator(tokens, max_parser_depth, max_parser_backtracks);
DB::ASTPtr json_path_ast;
DB::ParserJSONPath path_parser;
DB::Expected expected;
if (!path_parser.parse(token_iterator, json_path_ast, expected))
{
path_parsed = false;
}
json_path_asts.push_back(json_path_ast);
}
if (!path_parsed)
{
for (size_t i = 0; i < first_column.column->size(); ++i)
{
for (size_t j = 0; j < tuple_columns.size(); ++j)
tuple_columns[j]->insertDefault();
}
return DB::ColumnTuple::create(std::move(tuple_columns));
}
}
else
{
throw DB::Exception(
DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The second argument of function {} must be a non-constant column", getName());
}
if (!isString(first_column.type))
throw DB::Exception(
DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"The first argument of function {} should be a string containing JSON, illegal type: "
"{}",
String(name),
first_column.type->getName());
const DB::ColumnPtr & arg_json = first_column.column;
const auto * col_json_const = typeid_cast<const DB::ColumnConst *>(arg_json.get());
const auto * col_json_string
= typeid_cast<const DB::ColumnString *>(col_json_const ? col_json_const->getDataColumnPtr().get() : arg_json.get());
if (!col_json_string)
throw DB::Exception(DB::ErrorCodes::ILLEGAL_COLUMN, "Illegal column {}", arg_json->getName());
const DB::ColumnString::Chars & chars = col_json_string->getChars();
const DB::ColumnString::Offsets & offsets = col_json_string->getOffsets();
Impl impl;
JSONParser parser;
using Element = typename JSONParser::Element;
Element document;
bool document_ok = false;
if (col_json_const)
{
std::string_view json{reinterpret_cast<const char *>(chars.data()), offsets[0] - 1};
document_ok = safeParseJson(json, parser, document);
}
size_t tuple_size = tuple_columns.size();
std::vector<std::shared_ptr<DB::GeneratorJSONPath<JSONParser>>> generator_json_paths;
std::transform(
json_path_asts.begin(),
json_path_asts.end(),
std::back_inserter(generator_json_paths),
[](const auto & ast) { return std::make_shared<DB::GeneratorJSONPath<JSONParser>>(ast); });
for (const auto i : collections::range(0, arguments[0].column->size()))
{
if (!col_json_const)
{
std::string_view json{reinterpret_cast<const char *>(&chars[offsets[i - 1]]), offsets[i] - offsets[i - 1] - 1};
document_ok = safeParseJson(json, parser, document);
}
if (document_ok)
{
for (size_t j = 0; j < tuple_size; ++j)
{
generator_json_paths[j]->reinitialize();
if (!impl.insertResultToColumn(*tuple_columns[j], document, *generator_json_paths[j], path_has_asterisk[j]))
{
tuple_columns[j]->insertDefault();
}
}
}
else
{
for (size_t j = 0; j < tuple_size; ++j)
{
tuple_columns[j]->insertDefault();
}
}
}
return DB::ColumnTuple::create(std::move(tuple_columns));
}