cpp-ch/local-engine/Functions/SparkParseURL.cpp (445 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/IColumn.h>
#include <DataTypes/DataTypeNullable.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsStringSearchToString.h>
#include <Functions/IFunction.h>
#include <Functions/URL/domain.h>
#include <Poco/Logger.h>
#include <Poco/URI.h>
#include <memory>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
}
namespace local_engine
{
/// allow to return null.
template <typename Extractor>
struct ExtractNullableSubstringImpl
{
static void vector(const DB::ColumnString::Chars & data, const DB::ColumnString::Offsets & offsets,
DB::ColumnString::Chars & res_data, DB::ColumnString::Offsets & res_offsets, DB::IColumn & null_map)
{
size_t size = offsets.size();
res_offsets.resize_exact(size);
res_data.reserve_exact(size * Extractor::getReserveLengthForElement());
null_map.reserve(size);
size_t prev_offset = 0;
size_t res_offset = 0;
/// Matched part.
DB::Pos start;
size_t length;
for (size_t i = 0; i < size; ++i)
{
String s(reinterpret_cast<const char *>(&data[prev_offset]), offsets[i] - prev_offset - 1);
try
{
Poco::URI uri(s, false);
Extractor::execute(uri, s, start, length);
}
catch (const Poco::SyntaxException &)
{
start = nullptr;
length = 0;
}
res_data.resize_exact(res_data.size() + length + 1);
if (start)
{
memcpySmallAllowReadWriteOverflow15(&res_data[res_offset], start, length);
null_map.insert(0);
}
else
{
null_map.insert(1);
}
res_offset += length + 1;
res_data[res_offset - 1] = 0;
res_offsets[i] = res_offset;
prev_offset = offsets[i];
}
}
};
template <typename Impl, typename Name, bool is_injective = false>
class FunctionStringToNullableString : public DB::IFunction
{
public:
static constexpr auto name = Name::name;
static DB::FunctionPtr create(DB::ContextPtr)
{
return std::make_shared<FunctionStringToNullableString>();
}
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override
{
return 1;
}
bool isInjective(const DB::ColumnsWithTypeAndName &) const override
{
return is_injective;
}
bool isSuitableForShortCircuitArgumentsExecution(const DB::DataTypesWithConstInfo & /*arguments*/) const override
{
return true;
}
DB::DataTypePtr getReturnTypeImpl(const DB::DataTypes & arguments) const override
{
if (!DB::isStringOrFixedString(arguments[0]))
throw DB::Exception(DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}",
arguments[0]->getName(), getName());
return DB::makeNullable(arguments[0]);
}
bool useDefaultImplementationForConstants() const override { return true; }
DB::ColumnPtr executeImpl(const DB::ColumnsWithTypeAndName & arguments, const DB::DataTypePtr &, size_t /*input_rows_count*/) const override
{
const DB::ColumnPtr column = arguments[0].column;
auto null_map = DB::DataTypeUInt8().createColumn();
if (const DB::ColumnString * col = checkAndGetColumn<DB::ColumnString>(column.get()))
{
auto col_res = DB::ColumnString::create();
Impl::vector(col->getChars(), col->getOffsets(), col_res->getChars(), col_res->getOffsets(), *null_map);
return DB::ColumnNullable::create(std::move(col_res), std::move(null_map));
}
else
throw DB::Exception(DB::ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of argument of function {}",
arguments[0].column->getName(), getName());
}
};
template <typename Impl, typename Name>
class FunctionsStringSearchToNullableString : public DB::IFunction
{
public:
static constexpr auto name = Name::name;
static DB::FunctionPtr create(DB::ContextPtr) { return std::make_shared<FunctionsStringSearchToNullableString>(); }
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 2; }
bool useDefaultImplementationForConstants() const override { return true; }
DB::ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
bool isSuitableForShortCircuitArgumentsExecution(const DB::DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DB::DataTypePtr getReturnTypeImpl(const DB::DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw DB::Exception(DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}",
arguments[0]->getName(), getName());
if (!isString(arguments[1]))
throw DB::Exception(DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}",
arguments[1]->getName(), getName());
return DB::makeNullable(std::make_shared<DB::DataTypeString>());
}
DB::ColumnPtr executeImpl(const DB::ColumnsWithTypeAndName & arguments, const DB::DataTypePtr &, size_t /*input_rows_count*/) const override
{
const DB::ColumnPtr column = arguments[0].column;
const DB::ColumnPtr column_needle = arguments[1].column;
const DB::ColumnConst * col_needle = typeid_cast<const DB::ColumnConst *>(&*column_needle);
if (!col_needle)
throw DB::Exception(DB::ErrorCodes::ILLEGAL_COLUMN, "Second argument of function {} must be constant string", getName());
if (const DB::ColumnString * col = DB::checkAndGetColumn<DB::ColumnString>(column.get()))
{
auto col_res = DB::ColumnString::create();
auto null_map = DB::ColumnUInt8::create(col->size(), 0);
Impl::vector(*col, col_needle->getValue<String>(), *col_res, *null_map);
return DB::ColumnNullable::create(std::move(col_res), std::move(null_map));
}
else
throw DB::Exception(DB::ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of argument of function {}",
arguments[0].column->getName(), getName());
}
};
/// Different from CH extractURLParameters which returns an array result.
struct NameSparkExtractURLQuery
{
static constexpr auto name = "spark_parse_url_query";
};
struct SparkExtractURLQuery
{
static size_t getReserveLengthForElement() { return 30; }
static void execute(const Poco::URI & uri, const String & data, DB::Pos & res_data, size_t & res_size)
{
const auto & query = uri.getRawQuery();
res_data = query.data();
res_size = query.size();
String protocol_prefix = uri.getScheme() + "://";
DB::Pos query_string_begin = data.starts_with(protocol_prefix) ?
find_first_symbols<'?', '#'>(data.data(), data.data() + data.size()) :
find_first_symbols<'?', '#', ':'>(data.data(), data.data() + data.size());
if (query_string_begin && *query_string_begin != '?')
{
res_data = nullptr;
res_size = 0;
}
}
};
using SparkFunctionURLQuery = FunctionStringToNullableString<ExtractNullableSubstringImpl<SparkExtractURLQuery>, NameSparkExtractURLQuery>;
REGISTER_FUNCTION(SparkFunctionURLQuery)
{
factory.registerFunction<SparkFunctionURLQuery>();
}
struct NameSparkExtractURLOneQuery
{
static constexpr auto name = "spark_parse_url_one_query";
};
struct SparkExtractURLOneQuery
{
static void vector(const DB::ColumnString & col, std::string pattern, DB::IColumn & res_col, DB::IColumn & null_map)
{
DB::ColumnUInt8 & null_map_col = assert_cast<DB::ColumnUInt8 &>(null_map);
DB::PaddedPODArray<UInt8> & null_map_data = null_map_col.getData();
for (size_t i = 0; i < col.size(); ++i)
{
try
{
const String s = col.getDataAt(i).toString();
Poco::URI uri(s, false);
String protocol_prefix = uri.getScheme() + "://";
DB::Pos query_string_begin = s.starts_with(protocol_prefix) ?
find_first_symbols<'?', '#'>(s.data(), s.data() + s.size()) :
find_first_symbols<'?', '#', ':'>(s.data(), s.data() + s.size());
if (query_string_begin && *query_string_begin != '?')
{
res_col.insertDefault();
null_map_data[i] = 1;
continue;
}
const String & query = uri.getRawQuery();
DB::Pos query_pos = query.data();
auto getMatchedValue = [&](const DB::Pos & begin_pos, const size_t len) -> bool
{
for (size_t j = 0; j < len; ++j)
{
if (*(begin_pos + j) == '=')
{
if (pattern == String(begin_pos, j))
{
res_col.insertData(begin_pos + j + 1, len - j - 1);
return true;
}
}
}
return false;
};
bool matched = false;
for (size_t j = 0; j < query.size(); ++j)
{
if (query.at(j) == '&')
{
if(getMatchedValue(query_pos, query.data() + j - query_pos))
{
matched = true;
break;
}
else
query_pos = query.data() + j + 1;
}
}
if (!matched && query_pos < query.data() + query.size())
matched = getMatchedValue(query_pos, query.data() + query.size() - query_pos);
if (!matched)
res_col.insertDefault();
null_map_data[i] = !matched;
}
catch (const Poco::SyntaxException &)
{
res_col.insertDefault();
null_map_data[i] = 1;
}
}
}
};
using SparkFunctionURLOneQuery = FunctionsStringSearchToNullableString<SparkExtractURLOneQuery, NameSparkExtractURLOneQuery>;
REGISTER_FUNCTION(SparkFunctionURLOneQuery)
{
factory.registerFunction<SparkFunctionURLOneQuery>();
}
struct SparkExtractURLHost
{
static size_t getReserveLengthForElement() { return 30; }
static void execute(const Poco::URI & uri, const String &, DB::Pos & res_data, size_t & res_size)
{
const auto & host = uri.getHost();
res_data = host.data();
res_size = host.size();
if (host.empty())
{
res_data = nullptr;
res_size = 0;
}
}
};
struct NameSparkExtractURLHost
{
static constexpr auto name = "spark_parse_url_host";
};
using SparkFunctionURLHost = FunctionStringToNullableString<ExtractNullableSubstringImpl<SparkExtractURLHost>, NameSparkExtractURLHost>;
REGISTER_FUNCTION(SparkFunctionURLHost)
{
factory.registerFunction<SparkFunctionURLHost>();
}
struct NameSparkExtractURLPath
{
static constexpr auto name = "spark_parse_url_path";
};
struct SparkExtractURLPath
{
static size_t getReserveLengthForElement() { return 25; }
static void execute(const Poco::URI & uri, const String &, DB::Pos & res_data, size_t & res_size)
{
const auto & path = uri.getPath();
res_data = path.data();
res_size = path.size();
}
};
using SparkFunctionURLPath = FunctionStringToNullableString<ExtractNullableSubstringImpl<SparkExtractURLPath>, NameSparkExtractURLPath>;
REGISTER_FUNCTION(SparkFunctionURLPath)
{
factory.registerFunction<SparkFunctionURLPath>();
}
struct NameSparkExtractUserInfo
{
static constexpr auto name = "spark_parse_url_userinfo";
};
struct SparkExtractURLUserInfo
{
static size_t getReserveLengthForElement() { return 25; }
static void execute(const Poco::URI & uri, const String &, DB::Pos & res_data, size_t & res_size)
{
const auto & userinfo = uri.getUserInfo();
res_data = userinfo.data();
res_size = userinfo.size();
if (userinfo.empty())
{
res_data = nullptr;
res_size = 0;
}
}
};
using SparkFunctionURLUserInfo = FunctionStringToNullableString<ExtractNullableSubstringImpl<SparkExtractURLUserInfo>, NameSparkExtractUserInfo>;
REGISTER_FUNCTION(SparkFunctionURLUserInfo)
{
factory.registerFunction<SparkFunctionURLUserInfo>();
}
struct NameSparkExtractURLRef
{
static constexpr auto name = "spark_parse_url_ref";
};
struct SparkExtractURLRef
{
static size_t getReserveLengthForElement() { return 25; }
static void execute(const Poco::URI & uri, const String & data, DB::Pos & res_data, size_t & res_size)
{
const auto & fragment = uri.getFragment();
res_data = fragment.data();
res_size = fragment.size();
if (data.find(fragment) == std::string::npos || fragment.empty())
{
const auto * ref_delim_pos = find_first_symbols<'#'>(data.data(),data.data() + data.size());
if (ref_delim_pos && ref_delim_pos < data.data() + data.size())
{
res_data = ref_delim_pos + 1;
res_size = data.data() + data.size() - res_data;
}
else
{
res_data = nullptr;
res_size = 0;
}
}
}
};
using SparkFunctionURLRef = FunctionStringToNullableString<ExtractNullableSubstringImpl<SparkExtractURLRef>, NameSparkExtractURLRef>;
REGISTER_FUNCTION(SparkFunctionURLRef)
{
factory.registerFunction<SparkFunctionURLRef>();
}
struct NameSparkExtractURLFile
{
static constexpr auto name = "spark_parse_url_file";
};
struct SparkExtractURLFile
{
static size_t getReserveLengthForElement() { return 25; }
static void execute(const Poco::URI & uri, const String & data, DB::Pos & res_data, size_t & res_size)
{
const static String protocol_delim = "://";
const auto * protocol_delim_pos = static_cast<DB::Pos>(memmem(data.data(), data.size(), protocol_delim.data(), protocol_delim.size()));
if (!protocol_delim_pos)
{
auto colon_pos = find_first_symbols<':'>(data.data(), data.data() + data.size());
if (colon_pos && colon_pos + 1 < data.data() + data.size())
{
res_data = nullptr;
res_size = 0;
}
else
{
res_data = data.data();
res_size = data.size();
}
return;
}
const String & res = uri.getPath();
res_data = res.data();
res_size = res.size();
DB::Pos query_begin_pos = find_first_symbols<'?'>(protocol_delim_pos + protocol_delim.size(), data.data() + data.size());
if (query_begin_pos && *query_begin_pos == '?')
{
const String & query = uri.getRawQuery();
String new_res = res.empty() && query.empty() ? "" : res + "?" + query;
res_data = new_res.data();
res_size = new_res.size();
}
}
};
using SparkFunctionURLFile = FunctionStringToNullableString<ExtractNullableSubstringImpl<SparkExtractURLFile>, NameSparkExtractURLFile>;
REGISTER_FUNCTION(SparkFunctionURLFile)
{
factory.registerFunction<SparkFunctionURLFile>();
}
struct NameSparkExtractURLAuthority
{
static constexpr auto name = "spark_parse_url_authority";
};
struct SparkExtractURLAuthority
{
static size_t getReserveLengthForElement() { return 25; }
static void execute(const Poco::URI & uri, const String &, DB::Pos & res_data, size_t & res_size)
{
const auto & authority = uri.getAuthority();
res_data = authority.data();
res_size = authority.size();
if (authority.empty())
{
res_data = nullptr;
res_size = 0;
}
}
};
using SparkFunctionURLAuthority
= FunctionStringToNullableString<ExtractNullableSubstringImpl<SparkExtractURLAuthority>, NameSparkExtractURLAuthority>;
REGISTER_FUNCTION(SparkFunctionURLAuthority)
{
factory.registerFunction<SparkFunctionURLAuthority>();
}
struct NameSparkExtractURLInvalid
{
static constexpr auto name = "spark_parse_url_invalid";
};
struct SparkExtractURLInvalid
{
static size_t getReserveLengthForElement() { return 1; }
static void execute(const Poco::URI &, const String &, DB::Pos & res_data, size_t & res_size)
{
res_data = nullptr;
res_size = 0;
}
};
using SparkFunctionURLInvalid = FunctionStringToNullableString<ExtractNullableSubstringImpl<SparkExtractURLInvalid>, NameSparkExtractURLInvalid>;
REGISTER_FUNCTION(SparkFunctionURLInvalid)
{
factory.registerFunction<SparkFunctionURLInvalid>();
}
}