cpp-ch/local-engine/Operator/GraceAggregatingTransform.cpp (558 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "GraceAggregatingTransform.h" #include <Processors/Port.h> #include <Common/BitHelpers.h> #include <Common/CHUtil.h> #include <Common/CurrentThread.h> #include <Common/GlutenConfig.h> #include <Common/QueryContext.h> #include <Common/formatReadable.h> namespace DB::ErrorCodes { extern const int LOGICAL_ERROR; } namespace local_engine { GraceAggregatingTransform::GraceAggregatingTransform( const DB::Block & header_, DB::AggregatingTransformParamsPtr params_, DB::ContextPtr context_, bool no_pre_aggregated_, bool final_output_) : IProcessor({header_}, {params_->getHeader()}) , header(header_) , params(params_) , context(context_) , key_columns(params_->params.keys_size) , aggregate_columns(params_->params.aggregates_size) , no_pre_aggregated(no_pre_aggregated_) , final_output(final_output_) , tmp_data_disk(context_->getTempDataOnDisk()) { output_header = params->getHeader(); auto config = GraceMergingAggregateConfig::loadFromContext(context); max_buckets = config.max_grace_aggregate_merging_buckets; throw_on_overflow_buckets = config.throw_on_overflow_grace_aggregate_merging_buckets; aggregated_keys_before_extend_buckets = config.aggregated_keys_before_extend_grace_aggregate_merging_buckets; aggregated_keys_before_extend_buckets = PODArrayUtil::adjustMemoryEfficientSize(aggregated_keys_before_extend_buckets); max_pending_flush_blocks_per_bucket = config.max_pending_flush_blocks_per_grace_aggregate_merging_bucket; max_allowed_memory_usage_ratio = config.max_allowed_memory_usage_ratio_for_aggregate_merging; // bucket 0 is for in-memory data, it's just a placeholder. buckets.emplace(0, BufferFileStream()); enable_spill_test = config.enable_spill_test; if (enable_spill_test) buckets.emplace(1, BufferFileStream()); current_data_variants = std::make_shared<DB::AggregatedDataVariants>(); // IProcessor::spillable, MemorySpillScheduler will trigger the spill by enable this flag. spillable = true; } GraceAggregatingTransform::~GraceAggregatingTransform() { LOG_INFO( logger, "Metrics. total_input_blocks: {}, total_input_rows: {}, total_output_blocks: {}, total_output_rows: {}, total_spill_disk_bytes: " "{}, total_spill_disk_time: {}, total_read_disk_time: {}, total_scatter_time: {}", total_input_blocks, total_input_rows, total_output_blocks, total_output_rows, total_spill_disk_bytes, total_spill_disk_time, total_read_disk_time, total_scatter_time); } GraceAggregatingTransform::Status GraceAggregatingTransform::prepare() { auto & output = outputs.front(); auto & input = inputs.front(); if (output.isFinished() || isCancelled()) { input.close(); return Status::Finished; } if (has_output) { if (output.canPush()) { LOG_DEBUG( logger, "Output one chunk. rows: {}, bytes: {}, current memory usage: {}", output_chunk.getNumRows(), ReadableSize(output_chunk.bytes()), ReadableSize(currentThreadGroupMemoryUsage())); total_output_rows += output_chunk.getNumRows(); total_output_blocks++; output.push(std::move(output_chunk)); has_output = false; } return Status::PortFull; } if (has_input) return Status::Ready; if (!input_finished) { if (input.isFinished()) { input_finished = true; return Status::Ready; } input.setNeeded(); if (!input.hasData()) return Status::NeedData; input_chunk = input.pull(true); LOG_DEBUG( logger, "Input one new chunk. rows: {}, bytes: {}, current memory usage: {}", input_chunk.getNumRows(), ReadableSize(input_chunk.bytes()), ReadableSize(currentThreadGroupMemoryUsage())); total_input_rows += input_chunk.getNumRows(); total_input_blocks++; has_input = true; return Status::Ready; } if (current_bucket_index >= getBucketsNum() && (!block_converter || !block_converter->hasNext())) { output.finish(); return Status::Finished; } return Status::Ready; } void GraceAggregatingTransform::work() { if (has_input) { assert(!input_finished); auto block = header.cloneWithColumns(input_chunk.detachColumns()); mergeOneBlock(block, true); has_input = false; } else { assert(input_finished); if (!block_converter || !block_converter->hasNext()) { block_converter = nullptr; while (current_bucket_index < getBucketsNum()) { block_converter = prepareBucketOutputBlocks(current_bucket_index); if (block_converter) break; current_bucket_index++; } } if (!block_converter) { return; } while (block_converter->hasNext()) { auto block = block_converter->next(); if (!block.rows()) continue; output_chunk = DB::Chunk(block.getColumns(), block.rows()); has_output = true; break; } if (!block_converter->hasNext()) { block_converter = nullptr; current_bucket_index++; } } } bool GraceAggregatingTransform::extendBuckets() { if (!current_data_variants || current_data_variants->size() < aggregated_keys_before_extend_buckets) return false; auto current_size = getBucketsNum(); auto next_size = current_size * 2; /// We have a soft limit on the number of buckets. When throw_on_overflow_buckets = false, we just /// continue to run with the current number of buckets until the executor is killed by spark scheduler. if (next_size > max_buckets) { if (throw_on_overflow_buckets) throw DB::Exception( DB::ErrorCodes::LOGICAL_ERROR, "Too many buckets, limit is {}. Please consider increate offhead size or partitoin number", max_buckets); else return false; } LOG_DEBUG(logger, "Extend buckets num from {} to {}", current_size, next_size); for (size_t i = current_size; i < next_size; ++i) buckets.emplace(i, BufferFileStream()); return true; } void GraceAggregatingTransform::rehashDataVariants() { auto before_memoery_usage = currentThreadGroupMemoryUsage(); auto converter = currentDataVariantToBlockConverter(false); checkAndSetupCurrentDataVariants(); size_t block_rows = 0; size_t block_memory_usage = 0; no_more_keys = false; size_t bucket_n = 0; while (converter->hasNext()) { auto block = converter->next(); if (!block.rows()) continue; block_rows += block.rows(); block_memory_usage += block.allocatedBytes(); auto scattered_blocks = scatterBlock(block); block = {}; /// the new scattered blocks from block will alway belongs to the buckets with index >= current_bucket_index for (size_t i = 0; i < current_bucket_index; ++i) { if (scattered_blocks[i].rows()) throw DB::Exception( DB::ErrorCodes::LOGICAL_ERROR, "Scattered blocks should not belong to buckets with index({}) < current_bucket_index({})", i, current_bucket_index); } for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i) { addBlockIntoFileBucket(i, scattered_blocks[i], false); scattered_blocks[i] = {}; } params->aggregator.mergeOnBlock(scattered_blocks[current_bucket_index], *current_data_variants, no_more_keys, is_cancelled); } if (block_rows) per_key_memory_usage = block_memory_usage * 1.0 / block_rows; LOG_INFO( logger, "Rehash data variants. current_bucket_index: {}, buckets num: {}, memory usage change, from {} to {}", current_bucket_index, getBucketsNum(), ReadableSize(before_memoery_usage), ReadableSize(currentThreadGroupMemoryUsage())); }; DB::Blocks GraceAggregatingTransform::scatterBlock(const DB::Block & block) { if (!block.rows()) return {}; Stopwatch watch; size_t bucket_num = getBucketsNum(); auto blocks = DB::JoinCommon::scatterBlockByHash(params->params.keys, block, bucket_num); for (auto & new_block : blocks) { new_block.info.bucket_num = static_cast<Int32>(bucket_num); } total_scatter_time += watch.elapsedMilliseconds(); return blocks; } void GraceAggregatingTransform::addBlockIntoFileBucket(size_t bucket_index, const DB::Block & block, bool is_original_block) { if (!block.rows()) return; if (roundUpToPowerOfTwoOrZero(bucket_index + 1) > static_cast<size_t>(block.info.bucket_num)) { throw DB::Exception( DB::ErrorCodes::LOGICAL_ERROR, "Add invalid block with bucket_num {} into bucket {}", block.info.bucket_num, bucket_index); } auto & file_stream = buckets[bucket_index]; file_stream.pending_bytes += block.allocatedBytes(); if (is_original_block && no_pre_aggregated) file_stream.original_blocks.push_back(block); else file_stream.intermediate_blocks.push_back(block); if (file_stream.pending_bytes > max_pending_flush_blocks_per_bucket || (file_stream.pending_bytes && enable_spill_test)) { flushBucket(bucket_index); file_stream.pending_bytes = 0; } } void GraceAggregatingTransform::flushBuckets() { for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i) flushBucket(i); } static size_t flushBlocksInfoDisk(std::optional<DB::TemporaryBlockStreamHolder>& file_stream, std::list<DB::Block> & blocks) { size_t flush_bytes = 0; DB::Blocks tmp_blocks; if (!file_stream) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "file_stream is empty"); auto & tmp_stream = file_stream.value(); while (!blocks.empty()) { while (!blocks.empty()) { if (!tmp_blocks.empty() && tmp_blocks.back().info.bucket_num != blocks.front().info.bucket_num) break; tmp_blocks.push_back(blocks.front()); blocks.pop_front(); } auto bucket = tmp_blocks.front().info.bucket_num; auto merged_block = BlockUtil::concatenateBlocksMemoryEfficiently(std::move(tmp_blocks)); merged_block.info.bucket_num = bucket; tmp_blocks.clear(); flush_bytes += merged_block.bytes(); if (merged_block.rows()) { tmp_stream->write(merged_block); } } if (flush_bytes) tmp_stream->flush(); return flush_bytes; } size_t GraceAggregatingTransform::flushBucket(size_t bucket_index) { Stopwatch watch; auto & file_stream = buckets[bucket_index]; size_t flush_bytes = 0; if (!file_stream.original_blocks.empty()) { if (!file_stream.original_file_stream) file_stream.original_file_stream = DB::TemporaryBlockStreamHolder(header, tmp_data_disk.get()); flush_bytes += flushBlocksInfoDisk(file_stream.original_file_stream, file_stream.original_blocks); } if (!file_stream.intermediate_blocks.empty()) { if (!file_stream.intermediate_file_stream) { auto intermediate_header = params->aggregator.getHeader(false); file_stream.intermediate_file_stream = DB::TemporaryBlockStreamHolder(intermediate_header, tmp_data_disk.get()); } flush_bytes += flushBlocksInfoDisk(file_stream.intermediate_file_stream, file_stream.intermediate_blocks); } total_spill_disk_bytes += flush_bytes; total_spill_disk_time += watch.elapsedMilliseconds(); return flush_bytes; } std::unique_ptr<AggregateDataBlockConverter> GraceAggregatingTransform::prepareBucketOutputBlocks(size_t bucket_index) { auto & buffer_file_stream = buckets[bucket_index]; if (!current_data_variants && !buffer_file_stream.intermediate_file_stream && buffer_file_stream.intermediate_blocks.empty() && !buffer_file_stream.original_file_stream && buffer_file_stream.original_blocks.empty()) { return nullptr; } size_t read_bytes = 0; size_t read_rows = 0; Stopwatch watch; checkAndSetupCurrentDataVariants(); if (buffer_file_stream.intermediate_file_stream) { buffer_file_stream.intermediate_file_stream->finishWriting(); auto reader = buffer_file_stream.intermediate_file_stream->getReadStream(); while (true) { auto block = reader->read(); if (!block.rows()) break; read_bytes += block.bytes(); read_rows += block.rows(); mergeOneBlock(block, false); block = {}; } buffer_file_stream.intermediate_file_stream.reset(); total_read_disk_time += watch.elapsedMilliseconds(); } if (!buffer_file_stream.intermediate_blocks.empty()) { for (auto & block : buffer_file_stream.intermediate_blocks) { mergeOneBlock(block, false); block = {}; } } if (buffer_file_stream.original_file_stream) { buffer_file_stream.original_file_stream->finishWriting(); auto reader = buffer_file_stream.original_file_stream->getReadStream(); while (true) { auto block = reader->read(); if (!block.rows()) break; read_bytes += block.bytes(); read_rows += block.rows(); mergeOneBlock(block, true); block = {}; } buffer_file_stream.original_file_stream.reset(); total_read_disk_time += watch.elapsedMilliseconds(); } if (!buffer_file_stream.original_blocks.empty()) { for (auto & block : buffer_file_stream.original_blocks) { mergeOneBlock(block, true); block = {}; } } auto last_data_variants_size = current_data_variants->size(); auto converter = currentDataVariantToBlockConverter(final_output); LOG_INFO( logger, "prepare to output bucket {}, aggregated result keys: {}, keys size: {}, read bytes from disk: {}, read rows: {}, time: {} ms", bucket_index, last_data_variants_size, params->params.keys_size, ReadableSize(read_bytes), read_rows, watch.elapsedMilliseconds()); return std::move(converter); } std::unique_ptr<AggregateDataBlockConverter> GraceAggregatingTransform::currentDataVariantToBlockConverter(bool final) { if (!current_data_variants) { throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "current data variants is null"); } auto converter = std::make_unique<AggregateDataBlockConverter>(params->aggregator, current_data_variants, final); current_data_variants = nullptr; return std::move(converter); } void GraceAggregatingTransform::checkAndSetupCurrentDataVariants() { if (!current_data_variants) { current_data_variants = std::make_shared<DB::AggregatedDataVariants>(); no_more_keys = false; } } void GraceAggregatingTransform::mergeOneBlock(const DB::Block & block, bool is_original_block) { if (!block.rows()) return; checkAndSetupCurrentDataVariants(); // first to flush pending bytes into disk. if (isMemoryOverflow()) flushBuckets(); // then try to extend buckets. if (isMemoryOverflow() && extendBuckets()) { rehashDataVariants(); } // reset the flag force_spill = false; LOG_DEBUG( logger, "merge on block, rows: {}, bytes:{}, bucket: {}. current bucket: {}, total bucket: {}, mem used: {}", block.rows(), ReadableSize(block.bytes()), block.info.bucket_num, current_bucket_index, getBucketsNum(), ReadableSize(currentThreadGroupMemoryUsage())); /// the block could be one read from disk. block.info.bucket_num stores the number of buckets when it was scattered. /// so if the buckets number is not changed since it was scattered, we don't need to scatter it again. if (block.info.bucket_num == static_cast<Int32>(getBucketsNum()) || getBucketsNum() == 1) { if (is_original_block && no_pre_aggregated) params->aggregator.executeOnBlock(block, *current_data_variants, key_columns, aggregate_columns, no_more_keys); else params->aggregator.mergeOnBlock(block, *current_data_variants, no_more_keys, is_cancelled); } else { auto bucket_num = block.info.bucket_num; auto scattered_blocks = scatterBlock(block); for (size_t i = 0; i < current_bucket_index; ++i) { if (scattered_blocks[i].rows()) { throw DB::Exception( DB::ErrorCodes::LOGICAL_ERROR, "Scattered blocks should not belong to buckets with index({}) < current_bucket_index({}). bucket_num:{}. " "scattered_blocks.size: {}, total buckets: {}", i, current_bucket_index, bucket_num, scattered_blocks.size(), getBucketsNum()); } } for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i) { addBlockIntoFileBucket(i, scattered_blocks[i], is_original_block); } if (is_original_block && no_pre_aggregated) { params->aggregator.executeOnBlock( scattered_blocks[current_bucket_index], *current_data_variants, key_columns, aggregate_columns, no_more_keys); } else { params->aggregator.mergeOnBlock(scattered_blocks[current_bucket_index], *current_data_variants, no_more_keys, is_cancelled); } } } bool GraceAggregatingTransform::isMemoryOverflow() { if (force_spill) { auto stats = getMemoryStats(); if (stats.spillable_memory_bytes > force_spill_on_bytes * 0.8) return true; } /// More greedy memory usage strategy. if (!current_data_variants) return false; auto memory_soft_limit = DB::CurrentThread::getGroup()->memory_tracker.getSoftLimit(); if (!memory_soft_limit) return false; auto max_mem_used = static_cast<size_t>(memory_soft_limit * max_allowed_memory_usage_ratio); auto current_result_rows = current_data_variants->size(); auto current_mem_used = currentThreadGroupMemoryUsage(); if (per_key_memory_usage > 0) { if (current_mem_used + per_key_memory_usage * current_result_rows >= max_mem_used) { LOG_INFO( logger, "Memory is overflow. current_mem_used: {}, max_mem_used: {}, per_key_memory_usage: {}, aggregator keys: {}, buckets: {}, " "hash table type: {}", ReadableSize(current_mem_used), ReadableSize(max_mem_used), ReadableSize(per_key_memory_usage), current_result_rows, getBucketsNum(), current_data_variants->type); return true; } } else { if (current_mem_used * 2 >= max_mem_used) { LOG_INFO( logger, "Memory is overflow on half of max usage. current_mem_used: {}, max_mem_used: {}, aggregator keys: {}, buckets: {}, hash " "table type: {}", ReadableSize(current_mem_used), ReadableSize(max_mem_used), current_result_rows, getBucketsNum(), current_data_variants->type); return true; } } return false; } DB::ProcessorMemoryStats GraceAggregatingTransform::getMemoryStats() { DB::ProcessorMemoryStats stats; if (!current_data_variants) return stats; stats.need_reserved_memory_bytes = current_data_variants->aggregates_pool->allocatedBytes(); for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i) { auto & file_stream = buckets[i]; stats.spillable_memory_bytes += file_stream.pending_bytes; } if (per_key_memory_usage > 0) { auto current_result_rows = current_data_variants->size(); stats.need_reserved_memory_bytes += current_result_rows * per_key_memory_usage; stats.spillable_memory_bytes += current_result_rows * per_key_memory_usage; } else { // This is a rough estimation, we don't know the exact memory usage for each key. stats.spillable_memory_bytes += current_data_variants->aggregates_pool->allocatedBytes(); } return stats; } bool GraceAggregatingTransform::spillOnSize(size_t bytes) { auto stats = getMemoryStats(); if (stats.spillable_memory_bytes < bytes * 0.8) return false; force_spill = true; force_spill_on_bytes = bytes; return true; } }