cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp (497 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "GraceMergingAggregatedStep.h"
#include <Interpreters/JoinUtils.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/CHUtil.h>
#include <Common/CurrentThread.h>
#include <Common/formatReadable.h>
#include <Common/BitHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
namespace local_engine
{
static DB::ITransformingStep::Traits getTraits()
{
return DB::ITransformingStep::Traits
{
{
.preserves_number_of_streams = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}
static DB::Block buildOutputHeader(const DB::Block & input_header_, const DB::Aggregator::Params params_)
{
return params_.getHeader(input_header_, true);
}
GraceMergingAggregatedStep::GraceMergingAggregatedStep(
DB::ContextPtr context_,
const DB::DataStream & input_stream_,
DB::Aggregator::Params params_)
: DB::ITransformingStep(
input_stream_, buildOutputHeader(input_stream_.header, params_), getTraits())
, context(context_)
, params(std::move(params_))
{
}
void GraceMergingAggregatedStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &)
{
auto num_streams = pipeline.getNumStreams();
auto transform_params = std::make_shared<DB::AggregatingTransformParams>(pipeline.getHeader(), params, true);
pipeline.resize(1);
auto build_transform = [&](DB::OutputPortRawPtrs outputs)
{
DB::Processors new_processors;
for (auto & output : outputs)
{
auto op = std::make_shared<GraceMergingAggregatedTransform>(pipeline.getHeader(), transform_params, context);
new_processors.push_back(op);
DB::connect(*output, op->getInputs().front());
}
return new_processors;
};
pipeline.transform(build_transform);
pipeline.resize(num_streams, true);
}
void GraceMergingAggregatedStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const
{
return params.explain(settings.out, settings.offset);
}
void GraceMergingAggregatedStep::describeActions(DB::JSONBuilder::JSONMap & map) const
{
params.explain(map);
}
void GraceMergingAggregatedStep::updateOutputStream()
{
output_stream = createOutputStream(input_streams.front(), buildOutputHeader(input_streams.front().header, params), getDataStreamTraits());
}
GraceMergingAggregatedTransform::GraceMergingAggregatedTransform(const DB::Block &header_, DB::AggregatingTransformParamsPtr params_, DB::ContextPtr context_)
: IProcessor({header_}, {params_->getHeader()})
, header(header_)
, params(params_)
, context(context_)
, tmp_data_disk(std::make_unique<DB::TemporaryDataOnDisk>(context_->getTempDataOnDisk()))
{
max_buckets = context->getConfigRef().getUInt64("max_grace_aggregate_merging_buckets", 32);
throw_on_overflow_buckets = context->getConfigRef().getBool("throw_on_overflow_grace_aggregate_merging_buckets", false);
aggregated_keys_before_extend_buckets = context->getConfigRef().getUInt64("aggregated_keys_before_extend_grace_aggregate_merging_buckets", 8196);
aggregated_keys_before_extend_buckets = PODArrayUtil::adjustMemoryEfficientSize(aggregated_keys_before_extend_buckets);
max_pending_flush_blocks_per_bucket = context->getConfigRef().getUInt64("max_pending_flush_blocks_per_grace_aggregate_merging_bucket", 1024 * 1024);
max_allowed_memory_usage_ratio = context->getConfigRef().getDouble("max_allowed_memory_usage_ratio_for_aggregate_merging", 0.9);
// bucket 0 is for in-memory data, it's just a placeholder.
buckets.emplace(0, BufferFileStream());
current_data_variants = std::make_shared<DB::AggregatedDataVariants>();
}
GraceMergingAggregatedTransform::~GraceMergingAggregatedTransform()
{
LOG_INFO(
logger,
"Metrics. total_input_blocks: {}, total_input_rows: {}, total_output_blocks: {}, total_output_rows: {}, total_spill_disk_bytes: "
"{}, total_spill_disk_time: {}, total_read_disk_time: {}, total_scatter_time: {}",
total_input_blocks,
total_input_rows,
total_output_blocks,
total_output_rows,
total_spill_disk_bytes,
total_spill_disk_time,
total_read_disk_time,
total_scatter_time);
}
GraceMergingAggregatedTransform::Status GraceMergingAggregatedTransform::prepare()
{
auto & output = outputs.front();
auto & input = inputs.front();
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (has_output)
{
if (output.canPush())
{
LOG_DEBUG(
logger,
"Output one chunk. rows: {}, bytes: {}, current memory usage: {}",
output_chunk.getNumRows(),
ReadableSize(output_chunk.bytes()),
ReadableSize(MemoryUtil::getCurrentMemoryUsage()));
total_output_rows += output_chunk.getNumRows();
total_output_blocks++;
output.push(std::move(output_chunk));
has_output = false;
}
return Status::PortFull;
}
if (has_input)
return Status::Ready;
if (!input_finished)
{
if (input.isFinished())
{
input_finished = true;
return Status::Ready;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
input_chunk = input.pull(true);
LOG_DEBUG(
logger,
"Input one new chunk. rows: {}, bytes: {}, current memory usage: {}",
input_chunk.getNumRows(),
ReadableSize(input_chunk.bytes()),
ReadableSize(MemoryUtil::getCurrentMemoryUsage()));
total_input_rows += input_chunk.getNumRows();
total_input_blocks++;
has_input = true;
return Status::Ready;
}
if (current_bucket_index >= getBucketsNum() && (!block_converter || !block_converter->hasNext()))
{
output.finish();
return Status::Finished;
}
return Status::Ready;
}
void GraceMergingAggregatedTransform::work()
{
if (has_input)
{
assert(!input_finished);
auto block = header.cloneWithColumns(input_chunk.detachColumns());
mergeOneBlock(block);
has_input = false;
}
else
{
assert(input_finished);
if (!block_converter || !block_converter->hasNext())
{
block_converter = nullptr;
while (current_bucket_index < getBucketsNum())
{
block_converter = prepareBucketOutputBlocks(current_bucket_index);
if (block_converter)
break;
current_bucket_index++;
}
}
if (!block_converter)
{
return;
}
while(block_converter->hasNext())
{
auto block = block_converter->next();
if (!block.rows())
continue;
output_chunk = DB::Chunk(block.getColumns(), block.rows());
has_output = true;
break;
}
if (!block_converter->hasNext())
{
block_converter = nullptr;
current_bucket_index++;
}
}
}
bool GraceMergingAggregatedTransform::extendBuckets()
{
if (!current_data_variants || current_data_variants->size() < aggregated_keys_before_extend_buckets)
return false;
auto current_size = getBucketsNum();
auto next_size = current_size * 2;
/// We have a soft limit on the number of buckets. When throw_on_overflow_buckets = false, we just
/// continue to run with the current number of buckets until the executor is killed by spark scheduler.
if (next_size > max_buckets)
{
if (throw_on_overflow_buckets)
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR,
"Too many buckets, limit is {}. Please consider increate offhead size or partitoin number",
max_buckets);
else
return false;
}
LOG_DEBUG(logger, "Extend buckets num from {} to {}", current_size, next_size);
for (size_t i = current_size; i < next_size; ++i)
buckets.emplace(i, BufferFileStream());
return true;
}
void GraceMergingAggregatedTransform::rehashDataVariants()
{
auto before_memoery_usage = MemoryUtil::getCurrentMemoryUsage();
auto converter = currentDataVariantToBlockConverter(false);
checkAndSetupCurrentDataVariants();
size_t block_rows = 0;
size_t block_memory_usage = 0;
no_more_keys = false;
size_t bucket_n = 0;
while(converter->hasNext())
{
auto block = converter->next();
if (!block.rows())
continue;
block_rows += block.rows();
block_memory_usage += block.allocatedBytes();
auto scattered_blocks = scatterBlock(block);
block = {};
/// the new scattered blocks from block will alway belongs to the buckets with index >= current_bucket_index
for (size_t i = 0; i < current_bucket_index; ++i)
{
if (scattered_blocks[i].rows())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Scattered blocks should not belong to buckets with index({}) < current_bucket_index({})", i, current_bucket_index);
}
for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i)
{
addBlockIntoFileBucket(i, scattered_blocks[i]);
scattered_blocks[i] = {};
}
params->aggregator.mergeOnBlock(scattered_blocks[current_bucket_index], *current_data_variants, no_more_keys);
}
if (block_rows)
per_key_memory_usage = block_memory_usage * 1.0 / block_rows;
LOG_INFO(
logger,
"Rehash data variants. current_bucket_index: {}, buckets num: {}, memory usage change, from {} to {}",
current_bucket_index,
getBucketsNum(),
ReadableSize(before_memoery_usage),
ReadableSize(MemoryUtil::getCurrentMemoryUsage()));
};
DB::Blocks GraceMergingAggregatedTransform::scatterBlock(const DB::Block & block)
{
if (!block.rows())
return {};
Stopwatch watch;
size_t bucket_num = getBucketsNum();
auto blocks = DB::JoinCommon::scatterBlockByHash(params->params.keys, block, bucket_num);
for (auto & new_block : blocks)
{
new_block.info.bucket_num = static_cast<Int32>(bucket_num);
}
total_scatter_time += watch.elapsedMilliseconds();
return blocks;
}
void GraceMergingAggregatedTransform::addBlockIntoFileBucket(size_t bucket_index, const DB::Block & block)
{
if (!block.rows())
return;
if (roundUpToPowerOfTwoOrZero(bucket_index + 1) > static_cast<size_t>(block.info.bucket_num))
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Add invalid block with bucket_num {} into bucket {}", block.info.bucket_num, bucket_index);
}
auto & file_stream = buckets[bucket_index];
file_stream.pending_bytes += block.allocatedBytes();
file_stream.blocks.push_back(block);
if (file_stream.pending_bytes > max_pending_flush_blocks_per_bucket)
{
flushBucket(bucket_index);
file_stream.pending_bytes = 0;
}
}
void GraceMergingAggregatedTransform::flushBuckets()
{
for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i)
flushBucket(i);
}
size_t GraceMergingAggregatedTransform::flushBucket(size_t bucket_index)
{
Stopwatch watch;
auto & file_stream = buckets[bucket_index];
if (file_stream.blocks.empty())
return 0;
if (!file_stream.file_stream)
file_stream.file_stream = &tmp_data_disk->createStream(header);
DB::Blocks blocks;
size_t flush_bytes = 0;
while (!file_stream.blocks.empty())
{
while (!file_stream.blocks.empty())
{
if (!blocks.empty() && blocks.back().info.bucket_num != file_stream.blocks.front().info.bucket_num)
break;
blocks.push_back(file_stream.blocks.front());
file_stream.blocks.pop_front();
}
auto bucket = blocks.front().info.bucket_num;
auto merged_block = BlockUtil::concatenateBlocksMemoryEfficiently(std::move(blocks));
merged_block.info.bucket_num = bucket;
blocks.clear();
flush_bytes += merged_block.bytes();
if (merged_block.rows())
{
file_stream.file_stream->write(merged_block);
}
}
if (flush_bytes)
file_stream.file_stream->flush();
total_spill_disk_bytes += flush_bytes;
total_spill_disk_time += watch.elapsedMilliseconds();
return flush_bytes;
}
std::unique_ptr<AggregateDataBlockConverter> GraceMergingAggregatedTransform::prepareBucketOutputBlocks(size_t bucket_index)
{
auto & buffer_file_stream = buckets[bucket_index];
if (!current_data_variants && !buffer_file_stream.file_stream && buffer_file_stream.blocks.empty())
{
return nullptr;
}
size_t read_bytes = 0;
size_t read_rows = 0;
Stopwatch watch;
checkAndSetupCurrentDataVariants();
if (buffer_file_stream.file_stream)
{
buffer_file_stream.file_stream->finishWriting();
while (true)
{
auto block = buffer_file_stream.file_stream->read();
if (!block.rows())
break;
read_bytes += block.bytes();
read_rows += block.rows();
mergeOneBlock(block);
block = {};
}
buffer_file_stream.file_stream = nullptr;
total_read_disk_time += watch.elapsedMilliseconds();
}
if (!buffer_file_stream.blocks.empty())
{
for (auto & block : buffer_file_stream.blocks)
{
mergeOneBlock(block);
block = {};
}
}
auto last_data_variants_size = current_data_variants->size();
auto converter = currentDataVariantToBlockConverter(true);
LOG_INFO(
logger,
"prepare to output bucket {}, aggregated result keys: {}, keys size: {}, read bytes from disk: {}, read rows: {}, time: {} ms",
bucket_index,
last_data_variants_size,
params->params.keys_size,
ReadableSize(read_bytes),
read_rows,
watch.elapsedMilliseconds());
return std::move(converter);
}
std::unique_ptr<AggregateDataBlockConverter> GraceMergingAggregatedTransform::currentDataVariantToBlockConverter(bool final)
{
if (!current_data_variants)
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "current data variants is null");
}
auto converter = std::make_unique<AggregateDataBlockConverter>(params->aggregator, current_data_variants, final);
current_data_variants = nullptr;
return std::move(converter);
}
void GraceMergingAggregatedTransform::checkAndSetupCurrentDataVariants()
{
if (!current_data_variants)
{
current_data_variants = std::make_shared<DB::AggregatedDataVariants>();
no_more_keys = false;
}
}
void GraceMergingAggregatedTransform::mergeOneBlock(const DB::Block &block)
{
if (!block.rows())
return;
checkAndSetupCurrentDataVariants();
// first to flush pending bytes into disk.
if (isMemoryOverflow())
flushBuckets();
// then try to extend buckets.
if (isMemoryOverflow() && extendBuckets())
{
rehashDataVariants();
}
LOG_DEBUG(
logger,
"merge on block, rows: {}, bytes:{}, bucket: {}. current bucket: {}, total bucket: {}, mem used: {}",
block.rows(),
ReadableSize(block.bytes()),
block.info.bucket_num,
current_bucket_index,
getBucketsNum(),
ReadableSize(MemoryUtil::getCurrentMemoryUsage()));
/// the block could be one read from disk. block.info.bucket_num stores the number of buckets when it was scattered.
/// so if the buckets number is not changed since it was scattered, we don't need to scatter it again.
if (block.info.bucket_num == static_cast<Int32>(getBucketsNum()) || getBucketsNum() == 1)
{
params->aggregator.mergeOnBlock(block, *current_data_variants, no_more_keys);
}
else
{
auto bucket_num = block.info.bucket_num;
auto scattered_blocks = scatterBlock(block);
for (size_t i = 0; i < current_bucket_index; ++i)
{
if (scattered_blocks[i].rows())
{
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR,
"Scattered blocks should not belong to buckets with index({}) < current_bucket_index({}). bucket_num:{}. "
"scattered_blocks.size: {}, total buckets: {}",
i,
current_bucket_index,
bucket_num,
scattered_blocks.size(),
getBucketsNum());
}
}
for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i)
{
addBlockIntoFileBucket(i, scattered_blocks[i]);
}
params->aggregator.mergeOnBlock(scattered_blocks[current_bucket_index], *current_data_variants, no_more_keys);
}
}
bool GraceMergingAggregatedTransform::isMemoryOverflow()
{
/// More greedy memory usage strategy.
if (!current_data_variants)
return false;
if (!context->getSettingsRef().max_memory_usage)
return false;
auto max_mem_used = static_cast<size_t>(context->getSettingsRef().max_memory_usage * max_allowed_memory_usage_ratio);
auto current_result_rows = current_data_variants->size();
auto current_mem_used = MemoryUtil::getCurrentMemoryUsage();
if (per_key_memory_usage > 0)
{
if (current_mem_used + per_key_memory_usage * current_result_rows >= max_mem_used)
{
LOG_INFO(
logger,
"Memory is overflow. current_mem_used: {}, max_mem_used: {}, per_key_memory_usage: {}, aggregator keys: {}, buckets: {}, hash table type: {}",
ReadableSize(current_mem_used),
ReadableSize(max_mem_used),
ReadableSize(per_key_memory_usage),
current_result_rows,
getBucketsNum(),
current_data_variants->type);
return true;
}
}
else
{
if (current_mem_used * 2 >= max_mem_used)
{
LOG_INFO(
logger,
"Memory is overflow on half of max usage. current_mem_used: {}, max_mem_used: {}, aggregator keys: {}, buckets: {}, hash table type: {}",
ReadableSize(current_mem_used),
ReadableSize(max_mem_used),
current_result_rows,
getBucketsNum(),
current_data_variants->type);
return true;
}
}
return false;
}
}