cpp-ch/local-engine/Storages/ch_parquet/OptimizedArrowColumnToCHColumn.cpp (590 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "OptimizedArrowColumnToCHColumn.h"
#if USE_ARROW || USE_ORC || USE_PARQUET
#include <algorithm>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnUnique.h>
#include <Columns/ColumnsNumber.h>
#include <Core/Block.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/castColumn.h>
#include <Processors/Chunk.h>
#include <arrow/array.h>
#include <arrow/builder.h>
#include <base/types.h>
#include <Poco/Logger.h>
#include <Common/CHUtil.h>
#include <Common/DateLUTImpl.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <Common/quoteString.h>
#include "arrow/column_reader.h"
/// UINT16 and UINT32 are processed separately, see comments in readColumnFromArrowColumn.
#define FOR_ARROW_NUMERIC_TYPES(M) \
M(arrow::Type::UINT8, UInt8) \
M(arrow::Type::INT8, Int8) \
M(arrow::Type::INT16, Int16) \
M(arrow::Type::INT32, Int32) \
M(arrow::Type::UINT64, UInt64) \
M(arrow::Type::INT64, Int64) \
M(arrow::Type::HALF_FLOAT, Float32) \
M(arrow::Type::FLOAT, Float32) \
M(arrow::Type::DOUBLE, Float64)
#define FOR_ARROW_INDEXES_TYPES(M) \
M(arrow::Type::UINT8, UInt8) \
M(arrow::Type::INT8, UInt8) \
M(arrow::Type::UINT16, UInt16) \
M(arrow::Type::INT16, UInt16) \
M(arrow::Type::UINT32, UInt32) \
M(arrow::Type::INT32, UInt32) \
M(arrow::Type::UINT64, UInt64) \
M(arrow::Type::INT64, UInt64)
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_TYPE;
extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE;
extern const int BAD_ARGUMENTS;
extern const int DUPLICATE_COLUMN;
extern const int THERE_IS_NO_COLUMN;
extern const int UNKNOWN_EXCEPTION;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
}
/// Inserts numeric data right into internal column data to reduce an overhead
template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeNumber<NumericType>>();
auto internal_column = internal_type->createColumn();
auto & column_data = static_cast<VectorType &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
std::shared_ptr<arrow::Array> chunk = arrow_column->chunk(chunk_i);
if (chunk->length() == 0)
continue;
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data());
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
}
return {std::move(internal_column), std::move(internal_type), column_name};
}
/// Inserts chars and offsets right into internal column data to reduce an overhead.
/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
/// Also internal strings are null terminated.
static ColumnWithTypeAndName readColumnWithStringData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeString>();
auto internal_column = internal_type->createColumn();
PaddedPODArray<UInt8> & column_chars_t = assert_cast<ColumnString &>(*internal_column).getChars();
PaddedPODArray<UInt64> & column_offsets = assert_cast<ColumnString &>(*internal_column).getOffsets();
size_t chars_t_size = 0;
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::BinaryArray & chunk = dynamic_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
const size_t chunk_length = chunk.length();
if (chunk_length > 0)
{
chars_t_size += chunk.value_offset(chunk_length - 1) + chunk.value_length(chunk_length - 1);
chars_t_size += chunk_length; /// additional space for null bytes
}
}
column_chars_t.reserve(chars_t_size);
column_offsets.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::BinaryArray & chunk = dynamic_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
const size_t chunk_length = chunk.length();
for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
{
if (!chunk.IsNull(offset_i) && buffer)
{
const auto * raw_data = buffer->data() + chunk.value_offset(offset_i);
column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i));
}
column_chars_t.emplace_back('\0');
column_offsets.emplace_back(column_chars_t.size());
}
}
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeUInt8>();
auto internal_column = internal_type->createColumn();
auto & column_data = assert_cast<ColumnVector<UInt8> &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::BooleanArray & chunk = dynamic_cast<arrow::BooleanArray &>(*(arrow_column->chunk(chunk_i)));
if (chunk.length() == 0)
continue;
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
for (size_t bool_i = 0; bool_i != static_cast<size_t>(chunk.length()); ++bool_i)
column_data.emplace_back(chunk.Value(bool_i));
}
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeDate32>();
auto internal_column = internal_type->createColumn();
PaddedPODArray<Int32> & column_data = assert_cast<ColumnVector<Int32> &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::Date32Array & chunk = dynamic_cast<arrow::Date32Array &>(*(arrow_column->chunk(chunk_i)));
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
const auto * raw_data = reinterpret_cast<const Int32 *>(buffer->data());
column_data.insert_assume_reserved(raw_data, raw_data + chunk.length());
const Int32 * p_end = raw_data + chunk.length();
for (Int32 * p = const_cast<Int32 *>(raw_data); p < p_end; ++p)
if (unlikely(*p > DATE_LUT_MAX_EXTEND_DAY_NUM))
throw Exception{
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
"Input value {} of a column \"{}\" is greater than max allowed Date value, which is {}",
*p,
column_name,
DATE_LUT_MAX_DAY_NUM};
}
return {std::move(internal_column), std::move(internal_type), column_name};
}
/// Arrow stores Parquet::DATETIME in Int64, while ClickHouse stores DateTime in UInt32. Therefore, it should be checked before saving
static ColumnWithTypeAndName readColumnWithDate64Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeDateTime>();
auto internal_column = internal_type->createColumn();
auto & column_data = assert_cast<ColumnVector<UInt32> &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
auto & chunk = dynamic_cast<arrow::Date64Array &>(*(arrow_column->chunk(chunk_i)));
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
{
auto timestamp = static_cast<UInt32>(chunk.Value(value_i) / 1000); // Always? in ms
column_data.emplace_back(timestamp);
}
}
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readColumnWithTimestampData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
const auto & arrow_type = static_cast<const arrow::TimestampType &>(*(arrow_column->type()));
const UInt8 scale = arrow_type.unit() * 3;
auto internal_type = std::make_shared<DataTypeDateTime64>(scale, arrow_type.timezone());
auto internal_column = internal_type->createColumn();
auto & column_data = assert_cast<ColumnDecimal<DateTime64> &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
const auto & chunk = dynamic_cast<const arrow::TimestampArray &>(*(arrow_column->chunk(chunk_i)));
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
column_data.emplace_back(chunk.Value(value_i));
}
return {std::move(internal_column), std::move(internal_type), column_name};
}
template <typename DecimalType, typename DecimalArray>
static ColumnWithTypeAndName
readColumnWithDecimalDataImpl(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, DataTypePtr internal_type)
{
auto internal_column = internal_type->createColumn();
auto & column = assert_cast<ColumnDecimal<DecimalType> &>(*internal_column);
auto & column_data = column.getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
auto & chunk = dynamic_cast<DecimalArray &>(*(arrow_column->chunk(chunk_i)));
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
{
column_data.emplace_back(
chunk.IsNull(value_i) ? DecimalType(0) : *reinterpret_cast<const DecimalType *>(chunk.Value(value_i))); // TODO: copy column
}
}
return {std::move(internal_column), internal_type, column_name};
}
template <typename DecimalArray>
static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
const auto * arrow_decimal_type = static_cast<arrow::DecimalType *>(arrow_column->type().get());
size_t precision = arrow_decimal_type->precision();
auto internal_type = createDecimal<DataTypeDecimal>(precision, arrow_decimal_type->scale());
if (precision <= DecimalUtils::max_precision<Decimal32>)
return readColumnWithDecimalDataImpl<Decimal32, DecimalArray>(arrow_column, column_name, internal_type);
else if (precision <= DecimalUtils::max_precision<Decimal64>)
return readColumnWithDecimalDataImpl<Decimal64, DecimalArray>(arrow_column, column_name, internal_type);
else if (precision <= DecimalUtils::max_precision<Decimal128>)
return readColumnWithDecimalDataImpl<Decimal128, DecimalArray>(arrow_column, column_name, internal_type);
return readColumnWithDecimalDataImpl<Decimal256, DecimalArray>(arrow_column, column_name, internal_type);
}
/// Creates a null bytemap from arrow's null bitmap
static ColumnPtr readByteMapFromArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
{
auto nullmap_column = ColumnUInt8::create();
PaddedPODArray<UInt8> & bytemap_data = assert_cast<ColumnVector<UInt8> &>(*nullmap_column).getData();
bytemap_data.reserve(arrow_column->length());
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->num_chunks()); ++chunk_i)
{
std::shared_ptr<arrow::Array> chunk = arrow_column->chunk(chunk_i);
for (size_t value_i = 0; value_i != static_cast<size_t>(chunk->length()); ++value_i)
bytemap_data.emplace_back(chunk->IsNull(value_i));
}
return std::move(nullmap_column);
}
static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
{
auto offsets_column = ColumnUInt64::create();
ColumnArray::Offsets & offsets_data = assert_cast<ColumnVector<UInt64> &>(*offsets_column).getData();
offsets_data.reserve(arrow_column->length());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::ListArray & list_chunk = dynamic_cast<arrow::ListArray &>(*(arrow_column->chunk(chunk_i)));
auto arrow_offsets_array = list_chunk.offsets();
auto & arrow_offsets = dynamic_cast<arrow::Int32Array &>(*arrow_offsets_array);
auto start = offsets_data.back();
for (int64_t i = 1; i < arrow_offsets.length(); ++i)
offsets_data.emplace_back(start + arrow_offsets.Value(i));
}
return std::move(offsets_column);
}
static ColumnPtr readColumnWithIndexesData(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
{
switch (arrow_column->type()->id())
{
#define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case ARROW_NUMERIC_TYPE: { \
return readColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, "").column; \
}
FOR_ARROW_INDEXES_TYPES(DISPATCH)
#undef DISPATCH
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported type for indexes in LowCardinality: {}.", arrow_column->type()->name());
}
}
static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
{
arrow::ArrayVector array_vector;
array_vector.reserve(arrow_column->num_chunks());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::ListArray & list_chunk = dynamic_cast<arrow::ListArray &>(*(arrow_column->chunk(chunk_i)));
std::shared_ptr<arrow::Array> chunk = list_chunk.values();
array_vector.emplace_back(std::move(chunk));
}
return std::make_shared<arrow::ChunkedArray>(array_vector);
}
static ColumnWithTypeAndName readColumnFromArrowColumn(
const std::shared_ptr<arrow::Field> & arrow_field,
std::shared_ptr<arrow::ChunkedArray> & arrow_column,
const std::string & format_name,
std::unordered_map<String, std::shared_ptr<ColumnWithTypeAndName>> & dictionary_values,
bool read_ints_as_dates)
{
const auto is_nullable = arrow_field->nullable();
const auto column_name = arrow_field->name();
if (is_nullable)
{
auto nested_column
= readColumnFromArrowColumn(arrow_field->WithNullable(false), arrow_column, format_name, dictionary_values, read_ints_as_dates);
auto nullmap_column = readByteMapFromArrowColumn(arrow_column);
auto nullable_type = std::make_shared<DataTypeNullable>(std::move(nested_column.type));
auto nullable_column = ColumnNullable::create(nested_column.column, nullmap_column);
return {std::move(nullable_column), std::move(nullable_type), column_name};
}
auto * ch_chunk_array_p = dynamic_cast<ch_parquet::internal::CHStringArray *>(arrow_column->chunk(0).get());
if (ch_chunk_array_p != nullptr)
{
//the values are already written into CH Column, not arrow array
ch_chunk_array_p->column.name = column_name;
return ch_chunk_array_p->column;
}
switch (arrow_column->type()->id())
{
case arrow::Type::STRING:
case arrow::Type::BINARY:
//case arrow::Type::FIXED_SIZE_BINARY:
return readColumnWithStringData(arrow_column, column_name);
case arrow::Type::BOOL:
return readColumnWithBooleanData(arrow_column, column_name);
case arrow::Type::DATE32:
return readColumnWithDate32Data(arrow_column, column_name);
case arrow::Type::DATE64:
return readColumnWithDate64Data(arrow_column, column_name);
// ClickHouse writes Date as arrow UINT16 and DateTime as arrow UINT32,
// so, read UINT16 as Date and UINT32 as DateTime to perform correct conversion
// between Date and DateTime further.
case arrow::Type::UINT16: {
auto column = readColumnWithNumericData<UInt16>(arrow_column, column_name);
if (read_ints_as_dates)
column.type = std::make_shared<DataTypeDate>();
return column;
}
case arrow::Type::UINT32: {
auto column = readColumnWithNumericData<UInt32>(arrow_column, column_name);
if (read_ints_as_dates)
column.type = std::make_shared<DataTypeDateTime>();
return column;
}
case arrow::Type::TIMESTAMP:
return readColumnWithTimestampData(arrow_column, column_name);
case arrow::Type::DECIMAL128:
return readColumnWithDecimalData<arrow::Decimal128Array>(arrow_column, column_name);
case arrow::Type::DECIMAL256:
return readColumnWithDecimalData<arrow::Decimal256Array>(arrow_column, column_name);
case arrow::Type::MAP: {
const auto arrow_nested_field = arrow_field->type()->field(0);
auto arrow_nested_column = getNestedArrowColumn(arrow_column);
auto nested_column
= readColumnFromArrowColumn(arrow_nested_field, arrow_nested_column, format_name, dictionary_values, read_ints_as_dates);
auto offsets_column = readOffsetsFromArrowListColumn(arrow_column);
const auto * tuple_column = assert_cast<const ColumnTuple *>(nested_column.column.get());
const auto * tuple_type = assert_cast<const DataTypeTuple *>(nested_column.type.get());
auto map_column = ColumnMap::create(tuple_column->getColumnPtr(0), tuple_column->getColumnPtr(1), offsets_column);
auto map_type = std::make_shared<DataTypeMap>(tuple_type->getElements()[0], tuple_type->getElements()[1]);
return {std::move(map_column), std::move(map_type), column_name};
}
case arrow::Type::LIST: {
const auto arrow_nested_field = arrow_field->type()->field(0);
auto arrow_nested_column = getNestedArrowColumn(arrow_column);
auto nested_column
= readColumnFromArrowColumn(arrow_nested_field, arrow_nested_column, format_name, dictionary_values, read_ints_as_dates);
auto offsets_column = readOffsetsFromArrowListColumn(arrow_column);
auto array_column = ColumnArray::create(nested_column.column, offsets_column);
auto array_type = std::make_shared<DataTypeArray>(nested_column.type);
return {std::move(array_column), std::move(array_type), column_name};
}
case arrow::Type::STRUCT: {
auto arrow_type = arrow_field->type();
auto * arrow_struct_type = assert_cast<arrow::StructType *>(arrow_type.get());
std::vector<arrow::ArrayVector> nested_arrow_columns(arrow_struct_type->num_fields());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::StructArray & struct_chunk = dynamic_cast<arrow::StructArray &>(*(arrow_column->chunk(chunk_i)));
for (int i = 0; i < arrow_struct_type->num_fields(); ++i)
nested_arrow_columns[i].emplace_back(struct_chunk.field(i));
}
std::vector<String> tuple_names;
DataTypes tuple_types;
Columns tuple_elements;
for (int i = 0; i != arrow_struct_type->num_fields(); ++i)
{
const auto & nested_arrow_field = arrow_struct_type->field(i);
auto nested_arrow_column = std::make_shared<arrow::ChunkedArray>(nested_arrow_columns[i]);
auto element = readColumnFromArrowColumn(
nested_arrow_field, nested_arrow_column, format_name, dictionary_values, read_ints_as_dates);
tuple_names.emplace_back(std::move(element.name));
tuple_types.emplace_back(std::move(element.type));
tuple_elements.emplace_back(std::move(element.column));
}
auto tuple_column = ColumnTuple::create(std::move(tuple_elements));
auto tuple_type = std::make_shared<DataTypeTuple>(std::move(tuple_types), std::move(tuple_names));
return {std::move(tuple_column), std::move(tuple_type), column_name};
}
case arrow::Type::DICTIONARY: {
auto & dict_values = dictionary_values[column_name];
/// Load dictionary values only once and reuse it.
if (!dict_values)
{
arrow::ArrayVector dict_array;
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::DictionaryArray & dict_chunk = dynamic_cast<arrow::DictionaryArray &>(*(arrow_column->chunk(chunk_i)));
dict_array.emplace_back(dict_chunk.dictionary());
}
auto * arrow_dict_type = assert_cast<arrow::DictionaryType *>(arrow_field->type().get());
auto arrow_dict_field = arrow::field("dict", arrow_dict_type->value_type());
auto arrow_dict_column = std::make_shared<arrow::ChunkedArray>(dict_array);
auto dict_column
= readColumnFromArrowColumn(arrow_dict_field, arrow_dict_column, format_name, dictionary_values, read_ints_as_dates);
/// We should convert read column to ColumnUnique.
auto tmp_lc_column = DataTypeLowCardinality(dict_column.type).createColumn();
auto tmp_dict_column = IColumn::mutate(assert_cast<ColumnLowCardinality *>(tmp_lc_column.get())->getDictionaryPtr());
static_cast<IColumnUnique *>(tmp_dict_column.get())
->uniqueInsertRangeFrom(*dict_column.column, 0, dict_column.column->size());
dict_column.column = std::move(tmp_dict_column);
dict_values = std::make_shared<ColumnWithTypeAndName>(std::move(dict_column));
}
arrow::ArrayVector indexes_array;
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::DictionaryArray & dict_chunk = dynamic_cast<arrow::DictionaryArray &>(*(arrow_column->chunk(chunk_i)));
indexes_array.emplace_back(dict_chunk.indices());
}
auto arrow_indexes_column = std::make_shared<arrow::ChunkedArray>(indexes_array);
auto indexes_column = readColumnWithIndexesData(arrow_indexes_column);
auto lc_column = ColumnLowCardinality::create(dict_values->column, indexes_column);
auto lc_type = std::make_shared<DataTypeLowCardinality>(dict_values->type);
return {std::move(lc_column), std::move(lc_type), column_name};
}
#define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case ARROW_NUMERIC_TYPE: \
return readColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, column_name);
FOR_ARROW_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
// TODO: read JSON as a string?
// TODO: read UUID as a string?
default:
throw Exception(
ErrorCodes::UNKNOWN_TYPE,
"Unsupported {} type '{}' of an input column '{}'.",
format_name,
arrow_column->type()->name(),
column_name);
}
}
// Creating CH header by arrow schema. Will be useful in task about inserting
// data from file without knowing table structure.
static void checkStatus(const arrow::Status & status, const String & column_name, const String & format_name)
{
if (!status.ok())
throw Exception{ErrorCodes::UNKNOWN_EXCEPTION, "Error with a {} column '{}': {}.", format_name, column_name, status.ToString()};
}
Block OptimizedArrowColumnToCHColumn::arrowSchemaToCHHeader(const arrow::Schema & schema, const std::string & format_name)
{
ColumnsWithTypeAndName sample_columns;
for (const auto & field : schema.fields())
{
/// Create empty arrow column by it's type and convert it to ClickHouse column.
arrow::MemoryPool * pool = arrow::default_memory_pool();
std::unique_ptr<arrow::ArrayBuilder> array_builder;
arrow::Status status = MakeBuilder(pool, field->type(), &array_builder);
checkStatus(status, field->name(), format_name);
std::shared_ptr<arrow::Array> arrow_array;
status = array_builder->Finish(&arrow_array);
checkStatus(status, field->name(), format_name);
arrow::ArrayVector array_vector = {arrow_array};
auto arrow_column = std::make_shared<arrow::ChunkedArray>(array_vector);
std::unordered_map<std::string, std::shared_ptr<ColumnWithTypeAndName>> dict_values;
ColumnWithTypeAndName sample_column = readColumnFromArrowColumn(field, arrow_column, format_name, dict_values, false);
// std::cerr << "field:" << field->ToString() << ", datatype:" << sample_column.type->getName() << std::endl;
sample_columns.emplace_back(std::move(sample_column));
}
return Block(std::move(sample_columns));
}
OptimizedArrowColumnToCHColumn::OptimizedArrowColumnToCHColumn(
const Block & header_, const std::string & format_name_, bool import_nested_, bool allow_missing_columns_)
: header(header_), format_name(format_name_), import_nested(import_nested_), allow_missing_columns(allow_missing_columns_)
{
}
void OptimizedArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
{
NameToColumnPtr name_to_column_ptr;
for (const auto & column_name : table->ColumnNames())
{
std::shared_ptr<arrow::ChunkedArray> arrow_column = table->GetColumnByName(column_name);
if (!arrow_column)
throw Exception(ErrorCodes::DUPLICATE_COLUMN, "Column '{}' is duplicated", column_name);
name_to_column_ptr[column_name] = arrow_column;
}
Stopwatch sw;
sw.start();
if (!name_to_column_ptr.empty())
arrowColumnsToCHChunk(res, name_to_column_ptr, table->schema());
real_convert += sw.elapsedNanoseconds();
}
void OptimizedArrowColumnToCHColumn::arrowColumnsToCHChunk(
Chunk & res, NameToColumnPtr & name_to_column_ptr, const std::shared_ptr<arrow::Schema> & schema)
{
if (unlikely(name_to_column_ptr.empty()))
throw Exception(ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS, "Columns is empty");
Columns columns_list;
UInt64 num_rows = name_to_column_ptr.begin()->second->length();
columns_list.reserve(header.columns());
std::unordered_map<String, std::pair<BlockPtr, std::shared_ptr<local_engine::NestedColumnExtractHelper>>> nested_tables;
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
{
const ColumnWithTypeAndName & header_column = header.getByPosition(column_i);
auto search_column_name = header_column.name;
ColumnWithTypeAndName column;
if (!name_to_column_ptr.contains(search_column_name))
{
bool read_from_nested = false;
/// Check if it's a column from nested table.
if (import_nested)
{
String search_nested_table_name = Nested::extractTableName(header_column.name);
if (name_to_column_ptr.contains(search_nested_table_name))
{
if (!nested_tables.contains(search_nested_table_name))
{
const auto & arrow_field = schema->field(schema->GetFieldIndex(search_nested_table_name));
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[search_nested_table_name];
ColumnsWithTypeAndName cols
= {readColumnFromArrowColumn(arrow_field, arrow_column, format_name, dictionary_values, true)};
BlockPtr block_ptr = std::make_shared<Block>(cols);
auto column_extractor = std::make_shared<local_engine::NestedColumnExtractHelper>(*block_ptr, true);
nested_tables[search_nested_table_name] = {block_ptr, column_extractor};
}
auto nested_column = nested_tables[search_nested_table_name].second->extractColumn(search_column_name);
if (nested_column)
{
column = *nested_column;
read_from_nested = true;
}
}
}
if (!read_from_nested)
{
if (!allow_missing_columns)
{
throw Exception{ErrorCodes::THERE_IS_NO_COLUMN, "Column '{}' is not presented in input data.", header_column.name};
}
else
{
column.name = header_column.name;
column.type = header_column.type;
column.column = header_column.column->cloneResized(num_rows);
columns_list.push_back(std::move(column.column));
continue;
}
}
}
else
{
auto arrow_column = name_to_column_ptr[search_column_name];
const auto & arrow_field = schema->field(schema->GetFieldIndex(search_column_name));
column = readColumnFromArrowColumn(arrow_field, arrow_column, format_name, dictionary_values, true);
}
try
{
column.column = castColumn(column, header_column.type);
}
catch (Exception & e)
{
e.addMessage(fmt::format(
"while converting column {} from type {} to type {}",
backQuote(header_column.name),
column.type->getName(),
header_column.type->getName()));
throw;
}
column.type = header_column.type;
columns_list.push_back(std::move(column.column));
}
res.setColumns(columns_list, num_rows);
}
std::vector<size_t> OptimizedArrowColumnToCHColumn::getMissingColumns(const arrow::Schema & schema) const
{
std::vector<size_t> missing_columns;
auto block_from_arrow = arrowSchemaToCHHeader(schema, format_name);
local_engine::NestedColumnExtractHelper nested_table(block_from_arrow, true);
for (size_t i = 0, columns = header.columns(); i < columns; ++i)
{
const auto & column = header.getByPosition(i);
bool read_from_nested = false;
if (!block_from_arrow.has(column.name))
{
String nested_table_name = Nested::extractTableName(column.name);
if (import_nested && block_from_arrow.has(nested_table_name))
{
if (nested_table.extractColumn(column.name))
read_from_nested = true;
}
if (!read_from_nested)
{
if (!allow_missing_columns)
throw Exception{ErrorCodes::THERE_IS_NO_COLUMN, "Column '{}' is not presented in input data.", column.name};
missing_columns.push_back(i);
}
}
}
return missing_columns;
}
}
#endif