query/hash_reduction.cu (357 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <thrust/iterator/counting_iterator.h>
#include <cstring>
#include <algorithm>
#include <exception>
#include <cstdio>
#include <cstdlib>
#include "algorithm.hpp"
#include "iterator.hpp"
#include "memory.hpp"
#include "query/time_series_aggregate.h"
#include "concurrent_unordered_map.hpp"
#include "utils.hpp"
CGoCallResHandle HashReduce(DimensionVector inputKeys,
uint8_t *inputValues,
DimensionVector outputKeys,
uint8_t *outputValues,
int valueBytes,
int length,
AggregateFunction aggFunc,
void *stream,
int device) {
CGoCallResHandle resHandle = {nullptr, nullptr};
try {
#ifndef SUPPORT_HASH_REDUCTION
resHandle.res = reinterpret_cast<void *>(0);
#else
#ifdef RUN_ON_DEVICE
cudaSetDevice(device);
#endif
cudaStream_t cudaStream = reinterpret_cast<cudaStream_t>(stream);
resHandle.res = reinterpret_cast<void *>(ares::hash_reduction(inputKeys,
inputValues,
outputKeys,
outputValues,
valueBytes,
length,
aggFunc,
cudaStream));
CheckCUDAError("HashReduce");
#endif
}
catch (std::exception &e) {
std::cerr << "Exception happend when doing Reduce:" << e.what()
<< std::endl;
resHandle.pStrErr = strdup(e.what());
}
return resHandle;
}
#ifdef SUPPORT_HASH_REDUCTION
namespace ares {
template<typename map_type>
struct ExtractGroupByResultFunctor {
typedef typename map_type::key_type key_type;
typedef typename map_type::mapped_type element_type;
explicit ExtractGroupByResultFunctor(
map_type *const __restrict__ map,
uint8_t *dimInputValues,
size_t vectorCapacity,
uint8_t *dimOutputValues,
uint8_t dimWidthPrefixSum[NUM_DIM_WIDTH],
uint8_t *measureValues,
uint32_t *global_write_index) : map(map), dimInputValues(dimInputValues),
vectorCapacity(vectorCapacity),
dimOutputValues(dimOutputValues),
measureValues(measureValues),
global_write_index(
global_write_index) {
memcpy(this->dimWidthPrefixSum,
dimWidthPrefixSum, sizeof(uint8_t) * NUM_DIM_WIDTH);
}
map_type *map;
uint8_t *dimInputValues;
uint32_t *global_write_index;
uint8_t *measureValues;
uint8_t *dimOutputValues;
size_t vectorCapacity;
uint8_t dimWidthPrefixSum[NUM_DIM_WIDTH];
// copy_dim_values moves the inputDimValues at inputIdx to curDimOutput
// at outputIdx.
__host_or_device__
void copy_dim_values(uint32_t inputIdx, size_t outputIdx) {
uint8_t * curDimInput = dimInputValues;
uint8_t * curDimOutput = dimOutputValues;
// idx in numDimsPerDimWidth;
int widthIdx = 0;
uint8_t totalDims = dimWidthPrefixSum[NUM_DIM_WIDTH - 1];
// write values
// pointer address for dim value d on row r is
// (base_ptr + accumulatedValueBytes * vectorCapacity) + r * dimValueBytes.
for (uint8_t dimIndex = 0; dimIndex < totalDims; dimIndex += 1) {
// find correct widthIdx.
while (widthIdx < NUM_DIM_WIDTH
&& dimIndex >= dimWidthPrefixSum[widthIdx]) {
widthIdx++;
}
uint16_t dimValueBytes = 1 << (NUM_DIM_WIDTH - widthIdx - 1);
curDimInput += dimValueBytes * inputIdx;
curDimOutput += dimValueBytes * outputIdx;
setDimValue(curDimOutput, curDimInput, dimValueBytes);
// set to the start of next dim value
curDimInput += (vectorCapacity - inputIdx) * dimValueBytes;
curDimOutput += (vectorCapacity - outputIdx) * dimValueBytes;
}
// write nulls.
// Now both curDimInput and curDimOutput should point to start of
// null vector.
for (uint8_t dimIndex = 0; dimIndex < totalDims; dimIndex += 1) {
curDimInput += inputIdx;
curDimOutput += outputIdx;
*curDimOutput = *curDimInput;
// set to the start of next dim null vector
curDimInput += vectorCapacity - inputIdx;
curDimOutput += vectorCapacity - outputIdx;
}
}
__host_or_device__
void operator()(const int i) {
if (i < map->capacity()) {
key_type unusedKey = map->get_unused_key();
auto iter = ares::map_value_at(map, i);
key_type current_key = iter->first;
if (current_key != unusedKey) {
uint32_t outputIdx = ares::atomicAdd(global_write_index, (uint32_t)1);
copy_dim_values(static_cast<uint32_t>(current_key), outputIdx);
element_type value = iter->second;
reinterpret_cast<element_type *>(measureValues)[outputIdx] = value;
}
}
}
};
// hasher is the struct to return the hash value for a given key of concurrent
// map. It just returns the higher 32 bits of the key as the hash value. Note
// we cannot use lambda for hasher since the concurrent_unordered_map need
// hash_value_type defined in the struct.
struct Higher32BitsHasher {
using result_type = uint32_t;
using key_type = int64_t;
__host_or_device__
result_type operator()(key_type key) const {
return static_cast<result_type>(key >> 32);
}
};
// HashReductionContext wraps the concurrent_unordered_map for hash reduction.
// The key of the hash map is a 8 byte integer where the first 4 bytes are the
// hash_value and second 4 bytes are the dim row index, hash value of the map is
// pre-calculated during insertion because we use the same hash value for
// equality check as well when collision happens. Therefore the hash function
// of the hash map will be just extract the 1st item of the pair.
// value_type of the hash map is just the measure value. Therefore we can use
// atomic functions for most aggregate functions.
// After the reduction part is done, we need to output the result in the format
// of <index, aggregated_values>. Notes we don't need to output hash values.
template<typename value_type, typename agg_func>
class HashReductionContext {
using key_type = int64_t;
private:
// A reasonably large enough capacity of different dimension values for
// aggregation query. We also use this as the capacity for the hash map.
// Note we cannot make this assumption if we are going to use this map
// for join.
constexpr static float load_factor = 2;
agg_func f;
uint32_t capacity;
uint32_t length;
value_type identity;
public:
explicit HashReductionContext(uint32_t length,
value_type identity)
: capacity(length * load_factor), length(length),
identity(identity) {
f = agg_func();
}
// reduce reduces dimension value with same hash key into a single element
// using corresponding aggregation function. Note the key is actually
// < hash_value, index> pair but the equability check is only on
// hash_value. Therefore the first index paired with the hash value will
// be the final output value.
template<typename map_type>
void reduce(map_type *map,
DimensionVector inputKeys,
uint8_t *inputValues,
cudaStream_t cudaStream) {
DimensionHashIterator<32,
thrust::counting_iterator<uint32_t>>
hashIter(inputKeys.DimValues,
inputKeys.NumDimsPerDimWidth,
inputKeys.VectorCapacity);
auto rawMapKeyIter = thrust::make_zip_iterator(
thrust::make_tuple(hashIter, thrust::counting_iterator<uint32_t>(0)));
auto hashIndexFusionFunc = []
__host_or_device__(
typename decltype(rawMapKeyIter)::value_type tuple) {
return (static_cast<int64_t>(thrust::get<0>(tuple)) << 32)
| (static_cast<int64_t>(thrust::get<1>(tuple)));
};
auto mapKeyIter = thrust::make_transform_iterator(rawMapKeyIter,
hashIndexFusionFunc);
auto mapKeyValuePairIter = thrust::make_zip_iterator(
thrust::make_tuple(
mapKeyIter,
reinterpret_cast<value_type *>(inputValues)));
auto equality = []
__host_or_device__(key_type lhs, key_type rhs) {
return lhs >> 32 == rhs >> 32;
};
auto insertionFunc = [=]
__host_or_device__(
thrust::tuple<key_type, value_type> key_value_tuple) {
map->insert(tuple2pair(key_value_tuple), f, equality);
};
thrust::for_each_n(GET_EXECUTION_POLICY(cudaStream),
mapKeyValuePairIter,
length,
insertionFunc);
}
template<typename map_type>
int output(map_type *map,
DimensionVector inputKeys,
DimensionVector outputKeys, uint8_t *outputValue,
cudaStream_t cudaStream) {
// calculate prefix sum of NumDimsPerDimWidth so that it will be easier
// to tell the valueBytes given a dim index. The prefix sum is inclusive.
uint8_t
dimWidthPrefixSum[NUM_DIM_WIDTH];
for (int i = 0; i < NUM_DIM_WIDTH; i++) {
dimWidthPrefixSum[i] = inputKeys.NumDimsPerDimWidth[i];
if (i > 0) {
dimWidthPrefixSum[i] += dimWidthPrefixSum[i - 1];
}
}
device_unique_ptr<uint32_t> globalWriteIndexDevice =
make_device_unique<uint32_t>(cudaStream);
ExtractGroupByResultFunctor<map_type> extractorFunc(
map, inputKeys.DimValues, inputKeys.VectorCapacity,
outputKeys.DimValues, dimWidthPrefixSum,
outputValue, globalWriteIndexDevice.get());
thrust::for_each_n(GET_EXECUTION_POLICY(cudaStream),
thrust::counting_iterator<uint32_t>(0),
capacity, extractorFunc);
// copy the reduced count back from GPU.
uint32_t
globalWriteIndexHost;
ares::asyncCopyDeviceToHost(
reinterpret_cast<void *>(&globalWriteIndexHost),
reinterpret_cast<void *>(globalWriteIndexDevice.get()),
sizeof(uint32_t), cudaStream);
ares::waitForCudaStream(cudaStream);
return globalWriteIndexHost;
}
// execute reduces the dimension values with measures first and then extract
// aggregated result.
int execute(DimensionVector inputKeys, uint8_t *inputValues,
DimensionVector outputKeys, uint8_t *outputValues,
cudaStream_t cudaStream) {
auto equality = []
__host_or_device__(key_type lhs, key_type rhs) {
return (lhs >> 32) == (rhs >> 32);
};
typedef ares::hash_map<key_type,
value_type,
Higher32BitsHasher,
decltype(equality)> map_type;
host_unique_ptr<map_type>
mapHost = make_host_unique<map_type>(capacity, identity, 0,
Higher32BitsHasher(), equality);
device_unique_ptr<map_type>
mapDevice = make_device_unique<map_type>(cudaStream, mapHost.get());
reduce(mapDevice.get(), inputKeys, inputValues, cudaStream);
return output(mapDevice.get(),
inputKeys,
outputKeys,
outputValues,
cudaStream);
}
};
// hashReduceInternal reduces the dimension value using the aggregation
// function into a concurrent hash map. After reduction it will extract
// the dimension and measure values from hash map to corresponding
// dimension vector and value vector.
template<typename value_type, typename agg_func_type>
int hashReduceInternal(DimensionVector inputKeys,
uint8_t *inputValues,
DimensionVector outputKeys,
uint8_t *outputValues,
int length,
value_type identity,
cudaStream_t cudaStream) {
HashReductionContext<value_type, agg_func_type> context(length, identity);
return context.execute(inputKeys,
inputValues,
outputKeys,
outputValues,
cudaStream);
}
// This function simply binds the measure value type and aggregate function
// type.
int hash_reduction(DimensionVector inputKeys, uint8_t *inputValues,
DimensionVector outputKeys, uint8_t *outputValues,
int valueBytes, int length, AggregateFunction aggFunc,
cudaStream_t cudaStream) {
int outputLength = 0;
switch (aggFunc) {
#define REDUCE_INTERNAL(value_type, agg_func_type) \
outputLength = hashReduceInternal< \
value_type, agg_func_type>( \
inputKeys, \
inputValues, \
outputKeys, \
outputValues, \
length, \
get_identity_value<value_type>(aggFunc), \
cudaStream); break;
case AGGR_SUM_UNSIGNED:
if (valueBytes == 4) {
REDUCE_INTERNAL(uint32_t, sum_op < uint32_t >)
} else {
REDUCE_INTERNAL(uint64_t, sum_op < uint64_t >)
}
case AGGR_SUM_SIGNED:
if (valueBytes == 4) {
REDUCE_INTERNAL(int32_t, sum_op < int32_t >)
} else {
REDUCE_INTERNAL(int64_t, sum_op < int64_t >)
}
case AGGR_SUM_FLOAT:
if (valueBytes == 4) {
REDUCE_INTERNAL(float_t, sum_op < float_t >)
} else {
REDUCE_INTERNAL(double_t, sum_op < double_t >)
}
case AGGR_MIN_UNSIGNED:REDUCE_INTERNAL(uint32_t, min_op < uint32_t >)
case AGGR_MIN_SIGNED:REDUCE_INTERNAL(int32_t, min_op < int32_t >)
case AGGR_MIN_FLOAT:REDUCE_INTERNAL(float_t, min_op < float_t >)
case AGGR_MAX_UNSIGNED:REDUCE_INTERNAL(uint32_t, max_op < uint32_t >)
case AGGR_MAX_SIGNED:REDUCE_INTERNAL(int32_t, max_op < int32_t >)
case AGGR_MAX_FLOAT:REDUCE_INTERNAL(float_t, max_op < float_t >)
case AGGR_AVG_FLOAT:REDUCE_INTERNAL(uint64_t, RollingAvgFunctor)
default:
throw std::invalid_argument("Unsupported aggregation function type");
}
return outputLength;
}
} // namespace ares
#endif