in cpp-ch/local-engine/Storages/ch_parquet/OptimizedArrowColumnToCHColumn.cpp [350:518]
static ColumnWithTypeAndName readColumnFromArrowColumn(
const std::shared_ptr<arrow::Field> & arrow_field,
std::shared_ptr<arrow::ChunkedArray> & arrow_column,
const std::string & format_name,
std::unordered_map<String, std::shared_ptr<ColumnWithTypeAndName>> & dictionary_values,
bool read_ints_as_dates)
{
const auto is_nullable = arrow_field->nullable();
const auto column_name = arrow_field->name();
if (is_nullable)
{
auto nested_column
= readColumnFromArrowColumn(arrow_field->WithNullable(false), arrow_column, format_name, dictionary_values, read_ints_as_dates);
auto nullmap_column = readByteMapFromArrowColumn(arrow_column);
auto nullable_type = std::make_shared<DataTypeNullable>(std::move(nested_column.type));
auto nullable_column = ColumnNullable::create(nested_column.column, nullmap_column);
return {std::move(nullable_column), std::move(nullable_type), column_name};
}
auto * ch_chunk_array_p = dynamic_cast<ch_parquet::internal::CHStringArray *>(arrow_column->chunk(0).get());
if (ch_chunk_array_p != nullptr)
{
//the values are already written into CH Column, not arrow array
ch_chunk_array_p->column.name = column_name;
return ch_chunk_array_p->column;
}
switch (arrow_column->type()->id())
{
case arrow::Type::STRING:
case arrow::Type::BINARY:
//case arrow::Type::FIXED_SIZE_BINARY:
return readColumnWithStringData(arrow_column, column_name);
case arrow::Type::BOOL:
return readColumnWithBooleanData(arrow_column, column_name);
case arrow::Type::DATE32:
return readColumnWithDate32Data(arrow_column, column_name);
case arrow::Type::DATE64:
return readColumnWithDate64Data(arrow_column, column_name);
// ClickHouse writes Date as arrow UINT16 and DateTime as arrow UINT32,
// so, read UINT16 as Date and UINT32 as DateTime to perform correct conversion
// between Date and DateTime further.
case arrow::Type::UINT16: {
auto column = readColumnWithNumericData<UInt16>(arrow_column, column_name);
if (read_ints_as_dates)
column.type = std::make_shared<DataTypeDate>();
return column;
}
case arrow::Type::UINT32: {
auto column = readColumnWithNumericData<UInt32>(arrow_column, column_name);
if (read_ints_as_dates)
column.type = std::make_shared<DataTypeDateTime>();
return column;
}
case arrow::Type::TIMESTAMP:
return readColumnWithTimestampData(arrow_column, column_name);
case arrow::Type::DECIMAL128:
return readColumnWithDecimalData<arrow::Decimal128Array>(arrow_column, column_name);
case arrow::Type::DECIMAL256:
return readColumnWithDecimalData<arrow::Decimal256Array>(arrow_column, column_name);
case arrow::Type::MAP: {
const auto arrow_nested_field = arrow_field->type()->field(0);
auto arrow_nested_column = getNestedArrowColumn(arrow_column);
auto nested_column
= readColumnFromArrowColumn(arrow_nested_field, arrow_nested_column, format_name, dictionary_values, read_ints_as_dates);
auto offsets_column = readOffsetsFromArrowListColumn(arrow_column);
const auto * tuple_column = assert_cast<const ColumnTuple *>(nested_column.column.get());
const auto * tuple_type = assert_cast<const DataTypeTuple *>(nested_column.type.get());
auto map_column = ColumnMap::create(tuple_column->getColumnPtr(0), tuple_column->getColumnPtr(1), offsets_column);
auto map_type = std::make_shared<DataTypeMap>(tuple_type->getElements()[0], tuple_type->getElements()[1]);
return {std::move(map_column), std::move(map_type), column_name};
}
case arrow::Type::LIST: {
const auto arrow_nested_field = arrow_field->type()->field(0);
auto arrow_nested_column = getNestedArrowColumn(arrow_column);
auto nested_column
= readColumnFromArrowColumn(arrow_nested_field, arrow_nested_column, format_name, dictionary_values, read_ints_as_dates);
auto offsets_column = readOffsetsFromArrowListColumn(arrow_column);
auto array_column = ColumnArray::create(nested_column.column, offsets_column);
auto array_type = std::make_shared<DataTypeArray>(nested_column.type);
return {std::move(array_column), std::move(array_type), column_name};
}
case arrow::Type::STRUCT: {
auto arrow_type = arrow_field->type();
auto * arrow_struct_type = assert_cast<arrow::StructType *>(arrow_type.get());
std::vector<arrow::ArrayVector> nested_arrow_columns(arrow_struct_type->num_fields());
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::StructArray & struct_chunk = dynamic_cast<arrow::StructArray &>(*(arrow_column->chunk(chunk_i)));
for (int i = 0; i < arrow_struct_type->num_fields(); ++i)
nested_arrow_columns[i].emplace_back(struct_chunk.field(i));
}
std::vector<String> tuple_names;
DataTypes tuple_types;
Columns tuple_elements;
for (int i = 0; i != arrow_struct_type->num_fields(); ++i)
{
const auto & nested_arrow_field = arrow_struct_type->field(i);
auto nested_arrow_column = std::make_shared<arrow::ChunkedArray>(nested_arrow_columns[i]);
auto element = readColumnFromArrowColumn(
nested_arrow_field, nested_arrow_column, format_name, dictionary_values, read_ints_as_dates);
tuple_names.emplace_back(std::move(element.name));
tuple_types.emplace_back(std::move(element.type));
tuple_elements.emplace_back(std::move(element.column));
}
auto tuple_column = ColumnTuple::create(std::move(tuple_elements));
auto tuple_type = std::make_shared<DataTypeTuple>(std::move(tuple_types), std::move(tuple_names));
return {std::move(tuple_column), std::move(tuple_type), column_name};
}
case arrow::Type::DICTIONARY: {
auto & dict_values = dictionary_values[column_name];
/// Load dictionary values only once and reuse it.
if (!dict_values)
{
arrow::ArrayVector dict_array;
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::DictionaryArray & dict_chunk = dynamic_cast<arrow::DictionaryArray &>(*(arrow_column->chunk(chunk_i)));
dict_array.emplace_back(dict_chunk.dictionary());
}
auto * arrow_dict_type = assert_cast<arrow::DictionaryType *>(arrow_field->type().get());
auto arrow_dict_field = arrow::field("dict", arrow_dict_type->value_type());
auto arrow_dict_column = std::make_shared<arrow::ChunkedArray>(dict_array);
auto dict_column
= readColumnFromArrowColumn(arrow_dict_field, arrow_dict_column, format_name, dictionary_values, read_ints_as_dates);
/// We should convert read column to ColumnUnique.
auto tmp_lc_column = DataTypeLowCardinality(dict_column.type).createColumn();
auto tmp_dict_column = IColumn::mutate(assert_cast<ColumnLowCardinality *>(tmp_lc_column.get())->getDictionaryPtr());
static_cast<IColumnUnique *>(tmp_dict_column.get())
->uniqueInsertRangeFrom(*dict_column.column, 0, dict_column.column->size());
dict_column.column = std::move(tmp_dict_column);
dict_values = std::make_shared<ColumnWithTypeAndName>(std::move(dict_column));
}
arrow::ArrayVector indexes_array;
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->num_chunks()); chunk_i < num_chunks; ++chunk_i)
{
arrow::DictionaryArray & dict_chunk = dynamic_cast<arrow::DictionaryArray &>(*(arrow_column->chunk(chunk_i)));
indexes_array.emplace_back(dict_chunk.indices());
}
auto arrow_indexes_column = std::make_shared<arrow::ChunkedArray>(indexes_array);
auto indexes_column = readColumnWithIndexesData(arrow_indexes_column);
auto lc_column = ColumnLowCardinality::create(dict_values->column, indexes_column);
auto lc_type = std::make_shared<DataTypeLowCardinality>(dict_values->type);
return {std::move(lc_column), std::move(lc_type), column_name};
}
#define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case ARROW_NUMERIC_TYPE: \
return readColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, column_name);
FOR_ARROW_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
// TODO: read JSON as a string?
// TODO: read UUID as a string?
default:
throw Exception(
ErrorCodes::UNKNOWN_TYPE,
"Unsupported {} type '{}' of an input column '{}'.",
format_name,
arrow_column->type()->name(),
column_name);
}
}