query/sort_reduce.cu (291 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <thrust/iterator/zip_iterator.h>
#include <thrust/iterator/discard_iterator.h>
#include <thrust/gather.h>
#include <thrust/transform.h>
#include <cstring>
#include <algorithm>
#include <exception>
#include "algorithm.hpp"
#include "iterator.hpp"
#include "query/time_series_aggregate.h"
#include "memory.hpp"
CGoCallResHandle Sort(DimensionVector keys,
int length,
void *cudaStream,
int device) {
CGoCallResHandle resHandle = {nullptr, nullptr};
try {
#ifdef RUN_ON_DEVICE
cudaSetDevice(device);
#endif
ares::sort(keys, length,
reinterpret_cast<cudaStream_t>(cudaStream));
CheckCUDAError("Sort");
}
catch (std::exception &e) {
std::cerr << "Exception happend when doing Sort:" << e.what()
<< std::endl;
resHandle.pStrErr = strdup(e.what());
}
return resHandle;
}
CGoCallResHandle Reduce(DimensionVector inputKeys,
uint8_t *inputValues,
DimensionVector outputKeys,
uint8_t *outputValues,
int valueBytes,
int length,
AggregateFunction aggFunc,
void *stream,
int device) {
CGoCallResHandle resHandle = {nullptr, nullptr};
try {
#ifdef RUN_ON_DEVICE
cudaSetDevice(device);
#endif
cudaStream_t cudaStream = reinterpret_cast<cudaStream_t>(stream);
resHandle.res = reinterpret_cast<void *>(ares::reduce(inputKeys,
inputValues,
outputKeys,
outputValues,
valueBytes,
length,
aggFunc,
cudaStream));
CheckCUDAError("Reduce");
return resHandle;
}
catch (std::exception &e) {
std::cerr << "Exception happend when doing Reduce:" << e.what()
<< std::endl;
resHandle.pStrErr = strdup(e.what());
}
return resHandle;
}
CGoCallResHandle Expand(DimensionVector inputKeys,
DimensionVector outputKeys,
uint32_t *baseCounts,
uint32_t *indexVector,
int indexVectorLen,
int outputOccupiedLen,
void *stream,
int device) {
CGoCallResHandle resHandle = {nullptr, nullptr};
try {
SET_DEVICE(device);
cudaStream_t cudaStream = reinterpret_cast<cudaStream_t>(stream);
resHandle.res = reinterpret_cast<void *>(ares::expand(inputKeys,
outputKeys,
baseCounts,
indexVector,
indexVectorLen,
outputOccupiedLen,
cudaStream));
CheckCUDAError("Expand");
return resHandle;
}
catch (std::exception &e) {
std::cerr << "Exception happend when doing Expand:" << e.what()
<< std::endl;
resHandle.pStrErr = strdup(e.what());
}
return resHandle;
}
namespace ares {
// sort based on DimensionVector
void sort(DimensionVector keys,
int length,
cudaStream_t cudaStream) {
DimensionHashIterator<> hashIter(keys.DimValues,
keys.NumDimsPerDimWidth,
keys.VectorCapacity,
keys.IndexVector);
thrust::copy(GET_EXECUTION_POLICY(cudaStream),
hashIter,
hashIter + length,
keys.HashValues);
thrust::stable_sort_by_key(GET_EXECUTION_POLICY(cudaStream),
keys.HashValues,
keys.HashValues + length,
keys.IndexVector);
}
template<typename Value, typename AggFunc>
int reduceInternal(uint64_t *inputHashValues, uint32_t *inputIndexVector,
uint8_t *inputValues, uint64_t *outputHashValues,
uint32_t *outputIndexVector, uint8_t *outputValues,
int length, cudaStream_t cudaStream) {
thrust::equal_to<uint64_t> binaryPred;
AggFunc aggFunc;
ReduceByHashFunctor<AggFunc> reduceFunc(aggFunc);
auto zippedInputIter = thrust::make_zip_iterator(thrust::make_tuple(
inputIndexVector,
thrust::make_permutation_iterator(reinterpret_cast<Value *>(inputValues),
inputIndexVector)));
auto zippedOutputIter = thrust::make_zip_iterator(thrust::make_tuple(
outputIndexVector, reinterpret_cast<Value *>(outputValues)));
auto resEnd = thrust::reduce_by_key(GET_EXECUTION_POLICY(cudaStream),
inputHashValues,
inputHashValues + length,
zippedInputIter,
thrust::make_discard_iterator(),
zippedOutputIter,
binaryPred,
reduceFunc);
return thrust::get<1>(resEnd) - zippedOutputIter;
}
int bindValueAndAggFunc(uint64_t *inputHashValues,
uint32_t *inputIndexVector,
uint8_t *inputValues,
uint64_t *outputHashValues,
uint32_t *outputIndexVector,
uint8_t *outputValues,
int valueBytes,
int length,
AggregateFunction aggFunc,
cudaStream_t cudaStream) {
switch (aggFunc) {
#define REDUCE_INTERNAL(ValueType, AggFunc) \
return reduceInternal< ValueType, AggFunc >( \
inputHashValues, \
inputIndexVector, \
inputValues, \
outputHashValues, \
outputIndexVector, \
outputValues, \
length, \
cudaStream);
case AGGR_SUM_UNSIGNED:
if (valueBytes == 4) {
REDUCE_INTERNAL(uint32_t, thrust::plus<uint32_t>)
} else {
REDUCE_INTERNAL(uint64_t, thrust::plus<uint64_t>)
}
case AGGR_SUM_SIGNED:
if (valueBytes == 4) {
REDUCE_INTERNAL(int32_t, thrust::plus<int32_t>)
} else {
REDUCE_INTERNAL(int64_t, thrust::plus<int64_t>)
}
case AGGR_SUM_FLOAT:
if (valueBytes == 4) {
REDUCE_INTERNAL(float_t, thrust::plus<float_t>)
} else {
REDUCE_INTERNAL(double_t, thrust::plus<double_t>)
}
case AGGR_MIN_UNSIGNED:
REDUCE_INTERNAL(uint32_t, thrust::minimum<uint32_t>)
case AGGR_MIN_SIGNED:
REDUCE_INTERNAL(int32_t, thrust::minimum<int32_t>)
case AGGR_MIN_FLOAT:
REDUCE_INTERNAL(float_t, thrust::minimum<float_t>)
case AGGR_MAX_UNSIGNED:
REDUCE_INTERNAL(uint32_t, thrust::maximum<uint32_t>)
case AGGR_MAX_SIGNED:
REDUCE_INTERNAL(int32_t, thrust::maximum<int32_t>)
case AGGR_MAX_FLOAT:
REDUCE_INTERNAL(float_t, thrust::maximum<float_t>)
case AGGR_AVG_FLOAT:
REDUCE_INTERNAL(uint64_t, RollingAvgFunctor)
default:
throw std::invalid_argument("Unsupported aggregation function type");
}
}
int reduce(DimensionVector inputKeys, uint8_t *inputValues,
DimensionVector outputKeys, uint8_t *outputValues,
int valueBytes, int length, AggregateFunction aggFunc,
cudaStream_t cudaStream) {
int outputLength = bindValueAndAggFunc(
inputKeys.HashValues,
inputKeys.IndexVector,
inputValues,
outputKeys.HashValues,
outputKeys.IndexVector,
outputValues,
valueBytes,
length,
aggFunc,
cudaStream);
DimensionColumnPermutateIterator iterIn(
inputKeys.DimValues, outputKeys.IndexVector, inputKeys.VectorCapacity,
outputLength, inputKeys.NumDimsPerDimWidth);
DimensionColumnOutputIterator iterOut(outputKeys.DimValues,
inputKeys.VectorCapacity, outputLength,
inputKeys.NumDimsPerDimWidth, 0);
int numDims = 0;
for (int i = 0; i < NUM_DIM_WIDTH; i++) {
numDims += inputKeys.NumDimsPerDimWidth[i];
}
// copy dim values into output
thrust::copy(GET_EXECUTION_POLICY(cudaStream),
iterIn, iterIn + numDims * 2 * outputLength, iterOut);
return outputLength;
}
int expand(DimensionVector inputKeys,
DimensionVector outputKeys,
uint32_t *baseCounts,
uint32_t *indexVector,
int indexVectorLen,
int outputOccupiedLen,
cudaStream_t cudaStream) {
// create count interator from baseCount and indexVector
IndexCountIterator countIter = IndexCountIterator(baseCounts, indexVector);
// total item counts by adding counts together
uint32_t totalCount = thrust::reduce(GET_EXECUTION_POLICY(cudaStream),
countIter,
countIter+indexVectorLen);
// scan the counts to obtain output offsets for each input element
ares::device_vector<uint32_t> offsets(indexVectorLen);
thrust::exclusive_scan(GET_EXECUTION_POLICY(cudaStream),
countIter,
countIter+indexVectorLen,
offsets.begin());
// scatter the nonzero counts into their corresponding output positions
ares::device_vector<uint32_t> indices(totalCount);
thrust::scatter_if(GET_EXECUTION_POLICY(cudaStream),
thrust::counting_iterator<uint32_t>(0),
thrust::counting_iterator<uint32_t>(indexVectorLen),
offsets.begin(),
countIter,
indices.begin());
// compute max-scan over the indices, filling in the holes
thrust::inclusive_scan(GET_EXECUTION_POLICY(cudaStream),
indices.begin(),
indices.end(),
indices.begin(),
thrust::maximum<uint32_t>());
// get the raw pointer from device/host vector
uint32_t * newIndexVector = thrust::raw_pointer_cast(&indices[0]);
int outputLen = min(totalCount, outputKeys.VectorCapacity
- outputOccupiedLen);
// start the real copy operation
DimensionColumnPermutateIterator iterIn(
inputKeys.DimValues, newIndexVector, inputKeys.VectorCapacity,
outputLen, inputKeys.NumDimsPerDimWidth);
DimensionColumnOutputIterator iterOut(outputKeys.DimValues,
outputKeys.VectorCapacity, outputLen,
inputKeys.NumDimsPerDimWidth,
outputOccupiedLen);
int numDims = 0;
for (int i = 0; i < NUM_DIM_WIDTH; i++) {
numDims += inputKeys.NumDimsPerDimWidth[i];
}
// copy dim values into output
thrust::copy(GET_EXECUTION_POLICY(cudaStream), iterIn,
iterIn + numDims * 2 * outputLen, iterOut);
// return total count in the output dimensionVector
return outputLen + outputOccupiedLen;
}
} // namespace ares