velox/exec/Aggregate.h (147 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "velox/core/PlanNode.h"
#include "velox/vector/BaseVector.h"
namespace facebook::velox {
class HashStringAllocator;
}
namespace facebook::velox::exec {
class AggregateFunctionSignature;
// Returns true if aggregation receives raw (unprocessed) input, e.g. partial
// and single aggregation.
bool isRawInput(core::AggregationNode::Step step);
// Returns false if aggregation produces final result, e.g. final
// and single aggregation.
bool isPartialOutput(core::AggregationNode::Step step);
class Aggregate {
protected:
explicit Aggregate(TypePtr resultType) : resultType_(resultType) {}
public:
virtual ~Aggregate() {}
TypePtr resultType() const {
return resultType_;
}
// Returns the fixed number of bytes the accumulator takes on a group
// row. Variable width accumulators will reference the variable
// width part of the state from the fixed part.
virtual int32_t accumulatorFixedWidthSize() const = 0;
// Return true if accumulator is allocated from external memory, e.g. memory
// not managed by Velox.
virtual bool accumulatorUsesExternalMemory() const {
return false;
}
// Returns true if the accumulator never takes more than
// accumulatorFixedWidthSize() bytes.
virtual bool isFixedSize() const {
return true;
}
void setAllocator(HashStringAllocator* allocator) {
allocator_ = allocator;
}
// Sets the offset and null indicator position of 'this'.
// @param offset Offset in bytes from the start of the row of the accumulator
// @param nullByte Offset in bytes from the start of the row of the null flag
// @param nullMask The specific bit in the nullByte that stores the null flag
// @param rowSizeOffset The offset of a uint32_t row size from the start of
// the row. Only applies to accumulators that store variable size data out of
// line. Fixed length accumulators do not use this. 0 if the row does not have
// a size field.
void setOffsets(
int32_t offset,
int32_t nullByte,
uint8_t nullMask,
int32_t rowSizeOffset) {
nullByte_ = nullByte;
nullMask_ = nullMask;
offset_ = offset;
rowSizeOffset_ = rowSizeOffset;
}
// Initializes null flags and accumulators for newly encountered groups.
// @param groups Pointers to the start of the new group rows.
// @param indices Indices into 'groups' of the new entries.
virtual void initializeNewGroups(
char** groups,
folly::Range<const vector_size_t*> indices) = 0;
// Single Aggregate instance is able to take both raw data and
// intermediate result as input based on the assumption that Partial
// accumulator and Final accumulator are of the same type.
//
// Updates partial accumulators from raw input data.
// @param groups Pointers to the start of the group rows. These are aligned
// with the 'args', e.g. data in the i-th row of the 'args' goes to the i-th
// group. The groups may repeat if different rows go into the same group.
// @param rows Rows of the 'args' to add to the accumulators. These may not be
// contiguous if the aggregation has mask or is configured to drop null
// grouping keys. The latter would be the case when aggregation is followed
// by the join on the grouping keys.
// @param args Raw input.
// @param mayPushdown True if aggregation can be pushdown down via LazyVector.
// The pushdown can happen only if this flag is true and 'args' is a single
// LazyVector.
virtual void addRawInput(
char** groups,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool mayPushdown) = 0;
// Updates final accumulators from intermediate results.
// @param groups Pointers to the start of the group rows. These are aligned
// with the 'args', e.g. data in the i-th row of the 'args' goes to the i-th
// group. The groups may repeat if different rows go into the same group.
// @param rows Rows of the 'args' to add to the accumulators. These may not be
// contiguous if the aggregation has mask or is configured to drop null
// grouping keys. The latter would be the case when aggregation is followed
// by the join on the grouping keys.
// @param args Intermediate results produced by extractAccumulators().
// @param mayPushdown True if aggregation can be pushdown down via LazyVector.
// The pushdown can happen only if this flag is true and 'args' is a single
// LazyVector.
virtual void addIntermediateResults(
char** groups,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool mayPushdown) = 0;
// Updates the single partial accumulator from raw input data for global
// aggregation.
// @param group Pointer to the start of the group row.
// @param rows Rows of the 'args' to add to the accumulators. These may not
// be contiguous if the aggregation has mask.
// @param args Raw input to add to the accumulators.
// @param mayPushdown True if aggregation can be pushdown down via LazyVector.
// The pushdown can happen only if this flag is true and 'args' is a single
// LazyVector.
virtual void addSingleGroupRawInput(
char* group,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool mayPushdown) = 0;
// Updates the single final accumulator from intermediate results for global
// aggregation.
// @param group Pointer to the start of the group row.
// @param rows Rows of the 'args' to add to the accumulators. These may not
// be contiguous if the aggregation has mask.
// @param args Intermediate results produced by extractAccumulators().
// @param mayPushdown True if aggregation can be pushdown down via LazyVector.
// The pushdown can happen only if this flag is true and 'args' is a single
// LazyVector.
virtual void addSingleGroupIntermediateResults(
char* group,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool mayPushdown) = 0;
// Finalizes the state in groups. Defaults to no op for cases like
// sum and max.
virtual void finalize(char** groups, int32_t numGroups) = 0;
// Extracts final results (used for final and single aggregations).
// @param groups Pointers to the start of the group rows.
// @param numGroups Number of groups to extract results from.
// @param result The result vector to store the results in.
//
// 'result' and its parts are expected to be singly referenced. If
// other threads or operators hold references that they would use
// after 'result' has been updated by this, effects will b unpredictable.
virtual void
extractValues(char** groups, int32_t numGroups, VectorPtr* result) = 0;
// Extracts partial results (used for partial and intermediate aggregations).
// @param groups Pointers to the start of the group rows.
// @param numGroups Number of groups to extract results from.
// @param result The result vector to store the results in.
//
// See comment on 'result' in extractValues().
virtual void
extractAccumulators(char** groups, int32_t numGroups, VectorPtr* result) = 0;
// Frees any out of line storage for the accumulator in
// 'groups'. No-op for fixed length accumulators.
virtual void destroy(folly::Range<char**> /*groups*/) {}
// Clears state between reuses, e.g. this is called before reusing
// the aggregation operator's state after flushing a partial
// aggregation.
void clear() {
numNulls_ = 0;
}
static std::unique_ptr<Aggregate> create(
const std::string& name,
core::AggregationNode::Step step,
const std::vector<TypePtr>& argTypes,
const TypePtr& resultType);
protected:
bool isNull(char* group) const {
return numNulls_ && (group[nullByte_] & nullMask_);
}
void incrementRowSize(char* row, uint64_t bytes) {
VELOX_DCHECK(rowSizeOffset_);
uint32_t* ptr = reinterpret_cast<uint32_t*>(row + rowSizeOffset_);
uint64_t size = *ptr + bytes;
*ptr = std::min<uint64_t>(size, std::numeric_limits<uint32_t>::max());
}
// Sets null flag for all specified groups to true.
// For any given group, this method can be called at most once.
void setAllNulls(char** groups, folly::Range<const vector_size_t*> indices) {
for (auto i : indices) {
groups[i][nullByte_] |= nullMask_;
}
numNulls_ += indices.size();
}
inline bool clearNull(char* group) {
if (numNulls_) {
uint8_t mask = group[nullByte_];
if (mask & nullMask_) {
group[nullByte_] = mask & ~nullMask_;
--numNulls_;
return true;
}
}
return false;
}
template <typename T>
T* value(char* group) const {
return reinterpret_cast<T*>(group + offset_);
}
template <typename T>
static uint64_t* getRawNulls(T* vector) {
if (vector->mayHaveNulls()) {
BufferPtr nulls = vector->mutableNulls(vector->size());
return nulls->asMutable<uint64_t>();
} else {
return nullptr;
}
}
static void clearNull(uint64_t* rawNulls, vector_size_t index) {
if (rawNulls) {
bits::clearBit(rawNulls, index);
}
}
const TypePtr resultType_;
// Byte position of null flag in group row.
int32_t nullByte_;
uint8_t nullMask_;
// Offset of fixed length accumulator state in group row.
int32_t offset_;
// Offset of uint32_t row byte size of row. 0 if there are no
// variable width fields or accumulators on the row. The size is
// capped at 4G and will stay at 4G and not wrap around if growing
// past this. This serves to track the batch size when extracting
// rows. A size in excess of 4G would finish the batch in any case,
// so larger values need not be represented.
int32_t rowSizeOffset_ = 0;
// Number of null accumulators in the current state of the aggregation
// operator for this aggregate. If 0, clearing the null as part of update
// is not needed.
uint64_t numNulls_ = 0;
HashStringAllocator* allocator_;
// When selectivity vector has holes, in the pushdown, we need to generate a
// different indices vector as the one we get from the DecodedVector is simply
// sequential.
std::vector<vector_size_t> pushdownCustomIndices_;
};
using AggregateFunctionFactory = std::function<std::unique_ptr<Aggregate>(
core::AggregationNode::Step step,
const std::vector<TypePtr>& argTypes,
const TypePtr& resultType)>;
/// Register an aggregate function with the specified name and signatures.
bool registerAggregateFunction(
const std::string& name,
std::vector<std::shared_ptr<AggregateFunctionSignature>> signatures,
AggregateFunctionFactory factory);
/// Returns signatures of the aggregate function with the specified name.
/// Returns empty std::optional if function with that name is not found.
std::optional<std::vector<std::shared_ptr<AggregateFunctionSignature>>>
getAggregateFunctionSignatures(const std::string& name);
struct AggregateFunctionEntry {
std::vector<std::shared_ptr<AggregateFunctionSignature>> signatures;
AggregateFunctionFactory factory;
};
using AggregateFunctionMap =
std::unordered_map<std::string, AggregateFunctionEntry>;
AggregateFunctionMap& aggregateFunctions();
} // namespace facebook::velox::exec