cpp-ch/local-engine/Common/AggregateUtil.cpp (139 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <Poco/Logger.h>
#include <Common/AggregateUtil.h>
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template <typename Method>
static Int32 extractMethodBucketsNum(Method & /*method*/)
{
return Method::Data::NUM_BUCKETS;
}
Int32 GlutenAggregatorUtil::getBucketsNum(AggregatedDataVariants & data_variants)
{
if (!data_variants.isTwoLevel())
{
return 0;
}
Int32 buckets_num = 0;
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
buckets_num = extractMethodBucketsNum(*data_variants.NAME);
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else
throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant");
return buckets_num;
}
std::optional<Block> GlutenAggregatorUtil::safeConvertOneBucketToBlock(Aggregator & aggregator, AggregatedDataVariants & variants, Arena * arena, bool final, Int32 bucket)
{
if (!variants.isTwoLevel())
return {};
if (bucket >= getBucketsNum(variants))
return {};
return aggregator.convertOneBucketToBlock(variants, arena, final, bucket);
}
template<typename Method>
static void releaseOneBucket(Method & method, Int32 bucket)
{
method.data.impls[bucket].clearAndShrink();
}
void GlutenAggregatorUtil::safeReleaseOneBucket(AggregatedDataVariants & variants, Int32 bucket)
{
if (!variants.isTwoLevel())
return;
if (bucket >= getBucketsNum(variants))
return;
#define M(NAME) \
else if (variants.type == AggregatedDataVariants::Type::NAME) \
releaseOneBucket(*variants.NAME, bucket);
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else
throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant");
}
}
namespace local_engine
{
AggregateDataBlockConverter::AggregateDataBlockConverter(DB::Aggregator & aggregator_, DB::AggregatedDataVariantsPtr data_variants_, bool final_)
: aggregator(aggregator_), data_variants(std::move(data_variants_)), final(final_)
{
if (data_variants->isTwoLevel())
{
buckets_num = DB::GlutenAggregatorUtil::getBucketsNum(*data_variants);
}
else if (data_variants->size())
buckets_num = 1;
else
buckets_num = 0;
}
bool AggregateDataBlockConverter::hasNext()
{
while (current_bucket < buckets_num && output_blocks.empty())
{
if (data_variants->isTwoLevel())
{
Stopwatch watch;
auto optional_block = DB::GlutenAggregatorUtil::safeConvertOneBucketToBlock(
aggregator, *data_variants, data_variants->aggregates_pool, final, current_bucket);
if (!optional_block)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Invalid bucket number {} for two-level aggregation", current_bucket);
auto & block = *optional_block;
LOG_DEBUG(
&Poco::Logger::get("AggregateDataBlockConverter"),
"Convert bucket {} into one block, rows: {}, cols: {}, bytes:{}, total bucket: {}, total rows: {}, time: {}",
current_bucket,
block.rows(),
block.columns(),
ReadableSize(block.allocatedBytes()),
buckets_num,
data_variants->size(),
watch.elapsedMilliseconds());
DB::GlutenAggregatorUtil::safeReleaseOneBucket(*data_variants, current_bucket);
if (block.rows())
output_blocks.emplace_back(std::move(block));
}
else
{
size_t keys = data_variants->size();
auto blocks = aggregator.convertToBlocks(*data_variants, final, 1);
size_t total_allocated_bytes = 0;
size_t total_bytes = 0;
while (!blocks.empty())
{
if (blocks.front().rows())
{
total_allocated_bytes += blocks.front().allocatedBytes();
total_bytes += blocks.front().bytes();
output_blocks.emplace_back(std::move(blocks.front()));
}
blocks.pop_front();
}
LOG_DEBUG(
&Poco::Logger::get("AggregateDataBlockConverter"),
"Convert single level hash table into blocks. blocks: {}, total bytes: {}, total allocated bytes: {}, total rows: {}",
output_blocks.size(),
ReadableSize(total_bytes),
ReadableSize(total_allocated_bytes),
keys);
data_variants = nullptr;
}
++current_bucket;
}
return !output_blocks.empty();
}
DB::Block AggregateDataBlockConverter::next()
{
auto block = output_blocks.front();
output_blocks.pop_front();
return block;
}
}