query/hll.cu (270 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include <thrust/iterator/discard_iterator.h> #include <thrust/transform.h> #include <algorithm> #include "query/algorithm.hpp" #include "query/memory.hpp" CGoCallResHandle HyperLogLog(DimensionVector prevDimOut, DimensionVector curDimOut, uint32_t *prevValuesOut, uint32_t *curValuesOut, int prevResultSize, int curBatchSize, bool isLastBatch, uint8_t **hllVectorPtr, size_t *hllVectorSizePtr, uint16_t **hllRegIDCountPerDimPtr, void *stream, int device) { CGoCallResHandle resHandle = {nullptr, nullptr}; try { #ifdef RUN_ON_DEVICE cudaSetDevice(device); #endif cudaStream_t cudaStream = reinterpret_cast<cudaStream_t>(stream); resHandle.res = reinterpret_cast<void *>(ares::hyperloglog(prevDimOut, curDimOut, prevValuesOut, curValuesOut, prevResultSize, curBatchSize, isLastBatch, hllVectorPtr, hllVectorSizePtr, hllRegIDCountPerDimPtr, cudaStream)); CheckCUDAError("HyperLogLog"); return resHandle; } catch (std::exception &e) { std::cerr << "Exception happend when doing HyperLogLog:" << e.what() << std::endl; resHandle.pStrErr = strdup(e.what()); } return resHandle; } namespace ares { // hll sort batch sort current batch using higher 48bits of the hash produced // from dim values + 16bit reg_id from hll value, // Note: we store the actual dim values of the current batch into the // prevDimOut.DimValues vector, but we write the index vector, hash vector, hll // value vector into the curDimOut, index vector will be initialized to range // [prevResultSize, preResultSize + curBatchSize) void sortCurrentBatch(uint8_t *dimValues, uint64_t *hashValues, uint32_t *indexVector, uint8_t numDimsPerDimWidth[NUM_DIM_WIDTH], int vectorCapacity, uint32_t *curValuesOut, int prevResultSize, int curBatchSize, cudaStream_t cudaStream) { DimensionHashIterator<> hashIter( dimValues, numDimsPerDimWidth, vectorCapacity, indexVector); auto zippedValueIter = thrust::make_zip_iterator( thrust::make_tuple(indexVector, curValuesOut)); thrust::transform( GET_EXECUTION_POLICY(cudaStream), hashIter, hashIter + curBatchSize, curValuesOut, hashValues, HLLHashFunctor()); thrust::stable_sort_by_key( GET_EXECUTION_POLICY(cudaStream), hashValues, hashValues + curBatchSize, zippedValueIter); } // prepareHeadFlags prepares dimHeadFlags determines whether a element is the // head of a dimension partition template<typename DimHeadIter> void prepareHeadFlags(uint64_t *hashVector, DimHeadIter dimHeadFlags, int resultSize, cudaStream_t cudaStream) { HLLDimNotEqualFunctor dimNotEqual; // TODO(jians): if we see performance issue here, we can try to use custome // kernel to utilize shared memory thrust::transform( GET_EXECUTION_POLICY(cudaStream), hashVector, hashVector + resultSize - 1, hashVector + 1, dimHeadFlags + 1, dimNotEqual); } // createAndCopyHLLVector creates the hll vector based on // scanned count of reg_id counts per dimension and copy hll value // reg_id < 4096, copy hll measure value in sparse format // reg_id >= 4096, copy hll measure value in dense format void createAndCopyHLLVector(uint64_t *hashVector, uint8_t **hllVectorPtr, size_t *hllVectorSizePtr, uint16_t **hllRegIDCountPerDimPtr, unsigned int *dimCumCount, uint32_t *values, int resultSizeWithRegIDs, int resultSize, cudaStream_t cudaStream) { HLLRegIDHeadFlagIterator regIDHeadFlagIterator(hashVector); // allocate dimRegIDCount vector ares::deviceMalloc(reinterpret_cast<void **>(hllRegIDCountPerDimPtr), (size_t)resultSize * sizeof(uint16_t)); // produce dimRegIDCount vector thrust::reduce_by_key( GET_EXECUTION_POLICY(cudaStream), dimCumCount, dimCumCount + resultSizeWithRegIDs, regIDHeadFlagIterator, thrust::make_discard_iterator(), *hllRegIDCountPerDimPtr); // iterator for get byte count for each dim according to reg id count auto hllDimByteCountIter = thrust::make_transform_iterator( *hllRegIDCountPerDimPtr, HLLDimByteCountFunctor()); auto hllDimRegIDCountIter = thrust::make_transform_iterator( *hllRegIDCountPerDimPtr, CastFunctor<uint16_t, uint64_t>()); // get dim reg id cumulative count (cumulative count of reg_id per each // dimension value) ares::device_vector<uint64_t> hllDimRegIDCumCount(resultSize + 1, 0); ares::device_vector<uint64_t> hllVectorOffsets(resultSize + 1, 0); ares::device_vector<uint64_t> hllRegIDCumCount(resultSizeWithRegIDs); thrust::inclusive_scan( GET_EXECUTION_POLICY(cudaStream), hllDimRegIDCountIter, hllDimRegIDCountIter + resultSize, hllDimRegIDCumCount.begin() + 1); thrust::inclusive_scan( GET_EXECUTION_POLICY(cudaStream), hllDimByteCountIter, hllDimByteCountIter + resultSize, hllVectorOffsets.begin() + 1); thrust::inclusive_scan( GET_EXECUTION_POLICY(cudaStream), regIDHeadFlagIterator, regIDHeadFlagIterator + resultSizeWithRegIDs, hllRegIDCumCount.begin()); *hllVectorSizePtr = hllVectorOffsets[resultSize]; HLLValueOutputIterator hllValueOutputIter( dimCumCount, values, thrust::raw_pointer_cast(hllRegIDCumCount.data()), thrust::raw_pointer_cast(hllDimRegIDCumCount.data()), thrust::raw_pointer_cast(hllVectorOffsets.data())); // allocate dense vector deviceMalloc(reinterpret_cast<void **>(hllVectorPtr), *hllVectorSizePtr); deviceMemset(*hllVectorPtr, 0, *hllVectorSizePtr); thrust::transform_if( GET_EXECUTION_POLICY(cudaStream), hllValueOutputIter, hllValueOutputIter + resultSizeWithRegIDs, regIDHeadFlagIterator, thrust::make_discard_iterator(), CopyHLLFunctor(*hllVectorPtr), thrust::identity<unsigned int>()); } // copyDim is the same as regular dimension copy in regular reduce operations void copyDim(DimensionVector inputKeys, DimensionVector outputKeys, int outputLength, cudaStream_t cudaStream) { DimensionColumnPermutateIterator iterIn( inputKeys.DimValues, outputKeys.IndexVector, inputKeys.VectorCapacity, outputLength, inputKeys.NumDimsPerDimWidth); DimensionColumnOutputIterator iterOut(outputKeys.DimValues, inputKeys.VectorCapacity, outputLength, inputKeys.NumDimsPerDimWidth, 0); int numDims = 0; for (int i = 0; i < NUM_DIM_WIDTH; i++) { numDims += inputKeys.NumDimsPerDimWidth[i]; } thrust::copy(GET_EXECUTION_POLICY(cudaStream), iterIn, iterIn + numDims * 2 * outputLength, iterOut); } // merge merges previous batch results with current batch results // based on hash value (asce) and hll value (desc) void merge(uint64_t *inputHashValues, uint64_t *outputHashValues, uint32_t *inputValues, uint32_t *outputValues, uint32_t *inputIndexVector, uint32_t *outputIndexVector, int prevResultSize, int curBatchResultSize, cudaStream_t cudaStream) { auto zippedPrevBatchMergeKey = thrust::make_zip_iterator( thrust::make_tuple(inputHashValues, inputValues)); auto zippedCurBatchMergeKey = thrust::make_zip_iterator(thrust::make_tuple( inputHashValues + prevResultSize, inputValues + prevResultSize)); auto zippedOutputKey = thrust::make_zip_iterator( thrust::make_tuple(outputHashValues, outputValues)); thrust::merge_by_key( GET_EXECUTION_POLICY(cudaStream), zippedPrevBatchMergeKey, zippedPrevBatchMergeKey + prevResultSize, zippedCurBatchMergeKey, zippedCurBatchMergeKey + curBatchResultSize, inputIndexVector, inputIndexVector + prevResultSize, zippedOutputKey, outputIndexVector, HLLMergeComparator()); } int reduceCurrentBatch(uint64_t *inputHashValues, uint32_t *inputIndexVector, uint32_t *inputValues, uint64_t *outputHashValues, uint32_t *outputIndexVector, uint32_t *outputValues, int length, cudaStream_t cudaStream) { thrust::equal_to<uint64_t> binaryPred; thrust::maximum<uint32_t> maxOp; ReduceByHashFunctor<thrust::maximum<uint32_t> > reduceFunc(maxOp); auto zippedInputIter = thrust::make_zip_iterator( thrust::make_tuple(inputIndexVector, inputValues)); auto zippedOutputIter = thrust::make_zip_iterator( thrust::make_tuple(outputIndexVector, outputValues)); auto resEnd = thrust::reduce_by_key( GET_EXECUTION_POLICY(cudaStream), inputHashValues, inputHashValues + length, zippedInputIter, outputHashValues, zippedOutputIter, binaryPred, reduceFunc); return thrust::get<0>(resEnd) - outputHashValues; } int makeHLLVector(uint64_t *hashValues, uint32_t *indexVector, uint32_t *values, int resultSize, uint8_t **hllVectorPtr, size_t *hllVectorSizePtr, uint16_t **hllRegIDCountPerDimPtr, cudaStream_t cudaStream) { ares::device_vector<unsigned int> dimHeadFlags(resultSize, 1); prepareHeadFlags(hashValues, dimHeadFlags.begin(), resultSize, cudaStream); int reducedResultSize = thrust::remove_if( GET_EXECUTION_POLICY(cudaStream), indexVector, indexVector + resultSize, dimHeadFlags.begin(), thrust::detail::equal_to_value<unsigned int>(0)) - indexVector; thrust::inclusive_scan( GET_EXECUTION_POLICY(cudaStream), dimHeadFlags.begin(), dimHeadFlags.end(), dimHeadFlags.begin()); createAndCopyHLLVector(hashValues, hllVectorPtr, hllVectorSizePtr, hllRegIDCountPerDimPtr, thrust::raw_pointer_cast(dimHeadFlags.data()), values, resultSize, reducedResultSize, cudaStream); return reducedResultSize; } // the steps for hyperloglog: // 1. sort current batch // 2. reduce current batch // 3. merge current batch result with result from previous batches // 4. (last batch only) create dense hll vector // 5. copy dimension values int hyperloglog(DimensionVector prevDimOut, DimensionVector curDimOut, uint32_t *prevValuesOut, uint32_t *curValuesOut, int prevResultSize, int curBatchSize, bool isLastBatch, uint8_t **hllVectorPtr, size_t *hllVectorSizePtr, uint16_t **hllRegIDCountPerDimPtr, cudaStream_t cudaStream) { sortCurrentBatch(prevDimOut.DimValues, curDimOut.HashValues, curDimOut.IndexVector, curDimOut.NumDimsPerDimWidth, curDimOut.VectorCapacity, curValuesOut, prevResultSize, curBatchSize, cudaStream); int curResultSize = reduceCurrentBatch( curDimOut.HashValues, curDimOut.IndexVector, curValuesOut, prevDimOut.HashValues + prevResultSize, prevDimOut.IndexVector + prevResultSize, prevValuesOut + prevResultSize, curBatchSize, cudaStream); merge(prevDimOut.HashValues, curDimOut.HashValues, prevValuesOut, curValuesOut, prevDimOut.IndexVector, curDimOut.IndexVector, prevResultSize, curResultSize, cudaStream); int resSize = prevResultSize + curResultSize; if (isLastBatch && resSize > 0) { resSize = makeHLLVector( curDimOut.HashValues, curDimOut.IndexVector, curValuesOut, resSize, hllVectorPtr, hllVectorSizePtr, hllRegIDCountPerDimPtr, cudaStream); } copyDim(prevDimOut, curDimOut, resSize, cudaStream); return resSize; } } // namespace ares