DB::ColumnPtr innerExecuteImpl()

in cpp-ch/local-engine/Functions/SparkFunctionGetJsonObject.h [683:805]


    DB::ColumnPtr innerExecuteImpl(const DB::ColumnsWithTypeAndName & arguments) const
    {
        DB::DataTypePtr str_type = std::make_shared<DB::DataTypeString>();
        str_type = DB::makeNullable(str_type);
        DB::MutableColumns tuple_columns;
        std::vector<DB::ASTPtr> json_path_asts;

        std::vector<String> required_fields;
        std::vector<bool> path_has_asterisk;
        const auto & first_column = arguments[0];
        if (const auto * required_fields_col = typeid_cast<const DB::ColumnConst *>(arguments[1].column.get()))
        {
            std::string json_fields = required_fields_col->getDataAt(0).toString();
            Poco::StringTokenizer tokenizer(json_fields, "|");
            bool path_parsed = true;
            for (const auto & field : tokenizer)
            {
                auto normalized_field = JSONPathNormalizer::normalize(field);
                // LOG_ERROR(getLogger("JSONPatch"), "xxx field {} -> {}", field, normalized_field);
                if(normalized_field.find("[*]") != std::string::npos)
                    path_has_asterisk.emplace_back(true);
                else
                    path_has_asterisk.emplace_back(false);

                required_fields.push_back(normalized_field);
                tuple_columns.emplace_back(str_type->createColumn());

                const char * query_begin = reinterpret_cast<const char *>(required_fields.back().c_str());
                const char * query_end = required_fields.back().c_str() + required_fields.back().size();
                DB::Tokens tokens(query_begin, query_end);
                UInt32 max_parser_depth = static_cast<UInt32>(context->getSettingsRef()[DB::Setting::max_parser_depth]);
                UInt32 max_parser_backtracks = static_cast<UInt32>(context->getSettingsRef()[DB::Setting::max_parser_backtracks]);
                DB::IParser::Pos token_iterator(tokens, max_parser_depth, max_parser_backtracks);
                DB::ASTPtr json_path_ast;
                DB::ParserJSONPath path_parser;
                DB::Expected expected;
                if (!path_parser.parse(token_iterator, json_path_ast, expected))
                {
                    path_parsed = false;
                }
                json_path_asts.push_back(json_path_ast);
            }
            if (!path_parsed)
            {
                for (size_t i = 0; i < first_column.column->size(); ++i)
                {
                    for (size_t j = 0; j < tuple_columns.size(); ++j)
                        tuple_columns[j]->insertDefault();
                }
                return DB::ColumnTuple::create(std::move(tuple_columns));
            }
        }
        else
        {
            throw DB::Exception(
                DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The second argument of function {} must be a non-constant column", getName());
        }

        if (!isString(first_column.type))
            throw DB::Exception(
                DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
                "The first argument of function {} should be a string containing JSON, illegal type: "
                "{}",
                String(name),
                first_column.type->getName());

        const DB::ColumnPtr & arg_json = first_column.column;
        const auto * col_json_const = typeid_cast<const DB::ColumnConst *>(arg_json.get());
        const auto * col_json_string
            = typeid_cast<const DB::ColumnString *>(col_json_const ? col_json_const->getDataColumnPtr().get() : arg_json.get());
        if (!col_json_string)
            throw DB::Exception(DB::ErrorCodes::ILLEGAL_COLUMN, "Illegal column {}", arg_json->getName());
        const DB::ColumnString::Chars & chars = col_json_string->getChars();
        const DB::ColumnString::Offsets & offsets = col_json_string->getOffsets();

        Impl impl;
        JSONParser parser;
        using Element = typename JSONParser::Element;
        Element document;
        bool document_ok = false;
        if (col_json_const)
        {
            std::string_view json{reinterpret_cast<const char *>(chars.data()), offsets[0] - 1};
            document_ok = safeParseJson(json, parser, document);
        }

        size_t tuple_size = tuple_columns.size();
        std::vector<std::shared_ptr<DB::GeneratorJSONPath<JSONParser>>> generator_json_paths;
        std::transform(
            json_path_asts.begin(),
            json_path_asts.end(),
            std::back_inserter(generator_json_paths),
            [](const auto & ast) { return std::make_shared<DB::GeneratorJSONPath<JSONParser>>(ast); });

        for (const auto i : collections::range(0, arguments[0].column->size()))
        {
            if (!col_json_const)
            {
                std::string_view json{reinterpret_cast<const char *>(&chars[offsets[i - 1]]), offsets[i] - offsets[i - 1] - 1};
                document_ok = safeParseJson(json, parser, document);
            }
            if (document_ok)
            {
                for (size_t j = 0; j < tuple_size; ++j)
                {
                    generator_json_paths[j]->reinitialize();
                    if (!impl.insertResultToColumn(*tuple_columns[j], document, *generator_json_paths[j], path_has_asterisk[j]))
                    {
                        tuple_columns[j]->insertDefault();
                    }
                }
            }
            else
            {
                for (size_t j = 0; j < tuple_size; ++j)
                {
                    tuple_columns[j]->insertDefault();
                }
            }
        }

        return DB::ColumnTuple::create(std::move(tuple_columns));
    }