cpp-ch/local-engine/Storages/IO/NativeReader.cpp (193 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "NativeReader.h"
#include <IO/ReadHelpers.h>
#include <IO/VarInt.h>
#include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Common/Arena.h>
#include <Storages/IO/NativeWriter.h>
#include <Storages/IO/AggregateSerializationUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_INDEX;
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
extern const int INCORRECT_DATA;
extern const int TOO_LARGE_ARRAY_SIZE;
}
}
using namespace DB;
namespace local_engine
{
Block NativeReader::getHeader() const
{
return header;
}
DB::Block NativeReader::read()
{
DB::Block result_block;
if (istr.eof())
return result_block;
if (columns_parse_util.empty())
{
result_block = prepareByFirstBlock();
if (!result_block)
return {};
}
else
{
result_block = header.cloneEmpty();
}
/// Append small blocks into a large one, could reduce memory allocations overhead.
while (result_block.rows() < max_block_size && result_block.bytes() < max_block_bytes)
{
if (!appendNextBlock(result_block))
break;
}
if (result_block.rows())
{
for (size_t i = 0; i < result_block.columns(); ++i)
{
auto & column_parse_util = columns_parse_util[i];
auto & column = result_block.getByPosition(i);
column_parse_util.avg_value_size_hint = column.column->byteSize() / column.column->size();
}
}
return result_block;
}
static void readFixedSizeAggregateData(DB::ReadBuffer &in, DB::ColumnPtr & column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util)
{
ColumnAggregateFunction & real_column = typeid_cast<ColumnAggregateFunction &>(*column->assumeMutable());
auto & arena = real_column.createOrGetArena();
ColumnAggregateFunction::Container & vec = real_column.getData();
size_t initial_size = vec.size();
vec.reserve(initial_size + rows);
for (size_t i = 0; i < rows; ++i)
{
AggregateDataPtr place = arena.alignedAlloc(column_parse_util.aggregate_state_size, column_parse_util.aggregate_state_align);
column_parse_util.aggregate_function->create(place);
auto n = in.read(place, column_parse_util.aggregate_state_size);
chassert(n == column_parse_util.aggregate_state_size);
vec.push_back(place);
}
}
static void readVarSizeAggregateData(DB::ReadBuffer &in, DB::ColumnPtr & column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util)
{
ColumnAggregateFunction & real_column = typeid_cast<ColumnAggregateFunction &>(*column->assumeMutable());
auto & arena = real_column.createOrGetArena();
ColumnAggregateFunction::Container & vec = real_column.getData();
size_t initial_size = vec.size();
vec.reserve(initial_size + rows);
for (size_t i = 0; i < rows; ++i)
{
AggregateDataPtr place = arena.alignedAlloc(column_parse_util.aggregate_state_size, column_parse_util.aggregate_state_align);
column_parse_util.aggregate_function->create(place);
column_parse_util.aggregate_function->deserialize(place, in, std::nullopt, &arena);
in.ignore();
vec.push_back(place);
}
}
static void readNormalData(DB::ReadBuffer &in, DB::ColumnPtr & column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util)
{
ISerialization::DeserializeBinaryBulkSettings settings;
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return ∈ };
settings.avg_value_size_hint = column_parse_util.avg_value_size_hint;
settings.position_independent_encoding = false;
settings.native_format = true;
ISerialization::DeserializeBinaryBulkStatePtr state;
column_parse_util.serializer->deserializeBinaryBulkStatePrefix(settings, state);
column_parse_util.serializer->deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state, nullptr);
}
DB::Block NativeReader::prepareByFirstBlock()
{
if (istr.eof())
return {};
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
DB::Block result_block;
size_t columns = 0;
size_t rows = 0;
readVarUInt(columns, istr);
readVarUInt(rows, istr);
if (columns > 1'000'000uz)
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Suspiciously many columns in Native format: {}", columns);
if (rows > 1'000'000'000'000uz)
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Suspiciously many rows in Native format: {}", rows);
if (columns == 0 && !header && rows != 0)
throw Exception(ErrorCodes::INCORRECT_DATA, "Zero columns but {} rows in Native format.", rows);
for (size_t i = 0; i < columns; ++i)
{
ColumnWithTypeAndName column;
ColumnParseUtil column_parse_util;
column.name = "col_" + std::to_string(i);
/// Type
String type_name;
readBinary(type_name, istr);
bool agg_opt_column = false;
String real_type_name = type_name;
if (type_name.ends_with(NativeWriter::AGG_STATE_SUFFIX))
{
agg_opt_column = true;
real_type_name = type_name.substr(0, type_name.length() - NativeWriter::AGG_STATE_SUFFIX.length());
}
column.type = data_type_factory.get(real_type_name);
bool is_agg_state_type = WhichDataType(column.type).isAggregateFunction();
SerializationPtr serialization = column.type->getDefaultSerialization();
column_parse_util.type = column.type;
column_parse_util.name = column.name;
column_parse_util.serializer = serialization;
/// Data
ColumnPtr read_column = column.type->createColumn(*serialization);
if (rows) /// If no rows, nothing to read.
{
if (is_agg_state_type && agg_opt_column)
{
const DataTypeAggregateFunction * agg_type = checkAndGetDataType<DataTypeAggregateFunction>(column.type.get());
auto aggregate_function = agg_type->getFunction();
column_parse_util.aggregate_function = aggregate_function;
column_parse_util.aggregate_state_size = aggregate_function->sizeOfData();
column_parse_util.aggregate_state_align = aggregate_function->alignOfData();
bool fixed = isFixedSizeAggregateFunction(aggregate_function);
if (fixed)
{
readFixedSizeAggregateData(istr, read_column, rows, column_parse_util);
column_parse_util.parse = readFixedSizeAggregateData;
}
else
{
readVarSizeAggregateData(istr, read_column, rows, column_parse_util);
column_parse_util.parse = readVarSizeAggregateData;
}
}
else
{
readNormalData(istr, read_column, rows, column_parse_util);
column_parse_util.parse = readNormalData;
}
}
column.column = std::move(read_column);
result_block.insert(std::move(column));
columns_parse_util.emplace_back(column_parse_util);
}
header = result_block.cloneEmpty();
return result_block;
}
bool NativeReader::appendNextBlock(DB::Block & result_block)
{
if (istr.eof())
return false;
size_t columns = 0;
size_t rows = 0;
readVarUInt(columns, istr);
readVarUInt(rows, istr);
for (size_t i = 0; i < columns; ++i)
{
// Not actually read type name.
size_t type_name_len = 0;
readVarUInt(type_name_len, istr);
istr.ignore(type_name_len);
if (!rows) [[unlikely]]
continue;
auto & column_parse_util = columns_parse_util[i];
auto & column = result_block.getByPosition(i);
column_parse_util.parse(istr, column.column, rows, column_parse_util);
}
return true;
}
}