query/utils.hpp (235 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef QUERY_UTILS_HPP_ #define QUERY_UTILS_HPP_ #include <cuda_runtime.h> #include <thrust/pair.h> #include <thrust/tuple.h> #include <algorithm> #include <cfloat> #include <cmath> #include <cstdint> #include <exception> #include <type_traits> #include <stdexcept> #include <string> #include <tuple> #include <utility> #include "query/time_series_aggregate.h" #ifdef USE_RMM #include <rmm/thrust_rmm_allocator.h> #endif // We need this macro to define functions that can only be called in host // mode or device mode, but not both. The reason to have this mode is because // a "device and host" function can only call "device and host" function. They // cannot call device-only functions like "atomicAdd" even we call them under // RUN_ON_DEVICE macro. #ifdef RUN_ON_DEVICE #define __host_or_device__ __device__ #else #define __host_or_device__ __host__ #endif // This macro is for setting the correct thrust execution policy given whether // RUN_ON_DEVICE and USE_RMM #ifdef RUN_ON_DEVICE # ifdef USE_RMM # define GET_EXECUTION_POLICY(cudaStream) \ rmm::exec_policy(cudaStream)->on(cudaStream) # else # define GET_EXECUTION_POLICY(cudaStream) \ thrust::cuda::par.on(cudaStream) # endif #else # define GET_EXECUTION_POLICY(cudaStream) thrust::host #endif // This function will check the cuda error of current thread and throw an // exception if any. void CheckCUDAError(const char *message); // AlgorithmError represents a exception class that contains a error message. class AlgorithmError : public std::exception { protected: std::string message_; public: explicit AlgorithmError(const std::string &message); virtual const char *what() const throw(); }; namespace ares { // Parameters for custom kernel. const unsigned int WARP_SIZE = 32; const unsigned int STEP_SIZE = 64; const unsigned int BLOCK_SIZE = 512; // common_type determines the common type between type A and B, // that is the type both types can be implicitly converted to. template <typename A, typename B> struct common_type { typedef typename std::conditional< std::is_floating_point<A>::value || std::is_floating_point<B>::value, float_t, typename std::conditional< std::is_same<A, int64_t>::value || std::is_same<B, int64_t>::value, int64_t, typename std::conditional<std::is_signed<A>::value || std::is_signed<B>::value, int32_t, uint32_t>::type>::type>::type type; }; // Special common_type for GeoPointT template<> struct common_type<GeoPointT, GeoPointT> { typedef GeoPointT type; }; // Special common_type for UUIDT template<> struct common_type<UUIDT, UUIDT> { typedef UUIDT type; }; template<typename LHSIterator, typename RHSIterator> struct supported_binary_combination { static constexpr bool value = ((std::is_same< typename LHSIterator::value_type::head_type, UUIDT*>::value && (std::is_same< typename RHSIterator::value_type::head_type, UUIDT>::value || std::is_same< typename RHSIterator::value_type::head_type, int32_t>::value || std::is_same< typename RHSIterator::value_type::head_type, int>::value)) || (std::is_same< typename LHSIterator::value_type::head_type, GeoPointT*>::value && (std::is_same< typename RHSIterator::value_type::head_type, GeoPointT>::value || std::is_same< typename RHSIterator::value_type::head_type, int32_t>::value || std::is_same< typename RHSIterator::value_type::head_type, int>::value)) || (std::is_same< typename LHSIterator::value_type::head_type, GeoPointT>::value && std::is_same< typename RHSIterator::value_type::head_type, GeoPointT>::value) || (std::is_same< typename LHSIterator::value_type::head_type, UUIDT>::value && std::is_same< typename RHSIterator::value_type::head_type, UUIDT>::value) || (!std::is_same< typename LHSIterator::value_type::head_type, UUIDT*>::value && !std::is_same< typename LHSIterator::value_type::head_type, UUIDT>::value && !std::is_same< typename LHSIterator::value_type::head_type, GeoPointT>::value && !std::is_same< typename RHSIterator::value_type::head_type, UUIDT>::value && !std::is_same< typename RHSIterator::value_type::head_type, GeoPointT>::value && !std::is_same< typename LHSIterator::value_type::head_type, GeoPointT*>::value && !std::is_same< typename RHSIterator::value_type::head_type, UUIDT*>::value && !std::is_same< typename RHSIterator::value_type::head_type, GeoPointT*>::value)); }; // This is used to retrieve iterator value type // for non-array data type, will use common type, // likely will change later to support different types between left/right // for array data type, will use it's own data type template <typename A, typename B> struct input_iterator_value_type { typedef typename std::conditional< !std::is_pointer<A>::value && !std::is_pointer<B>::value, typename common_type<A, B>::type, A>::type type; }; // get_identity_value returns the identity value for the aggregation function. // Identity value is a special type of element of a set with respect to a // binary operation on that set, which leaves other elements unchanged when // combined with them. template <typename Value> __host__ __device__ Value get_identity_value(AggregateFunction aggFunc) { switch (aggFunc) { case AGGR_AVG_FLOAT:return 0; // zero avg and zero count. case AGGR_SUM_UNSIGNED: case AGGR_SUM_SIGNED: case AGGR_SUM_FLOAT:return 0; case AGGR_MIN_UNSIGNED:return static_cast<Value>(UINT32_MAX); case AGGR_MIN_SIGNED:return static_cast<Value>(INT32_MAX); case AGGR_MIN_FLOAT:return static_cast<Value>(FLT_MAX); case AGGR_MAX_UNSIGNED:return 0; case AGGR_MAX_SIGNED:return static_cast<Value>(INT32_MIN); case AGGR_MAX_FLOAT:return static_cast<Value>(FLT_MIN); default:return 0; } } inline uint8_t getStepInBytes(DataType dataType) { switch (dataType) { case Bool: case Int8: case Uint8:return 1; case Int16: case Uint16:return 2; case Int32: case Uint32: case Float32:return 4; case GeoPoint: case Int64: case Uint64: return 8; case UUID: return 16; default: throw std::invalid_argument( "Unsupported data type for VectorPartyInput"); } } inline __host__ __device__ void setDimValue(uint8_t *outPtr, uint8_t *inPtr, uint16_t dimBytes) { switch (dimBytes) { case 16: *reinterpret_cast<UUIDT *>(outPtr) = *reinterpret_cast<UUIDT *>(inPtr); case 8: *reinterpret_cast<uint64_t *>(outPtr) = *reinterpret_cast<uint64_t *>(inPtr); case 4: *reinterpret_cast<uint32_t *>(outPtr) = *reinterpret_cast<uint32_t *>(inPtr); case 2: *reinterpret_cast<uint16_t *>(outPtr) = *reinterpret_cast<uint16_t *>(inPtr); case 1:*outPtr = *inPtr; } } template<typename kernel> void calculateDim3(int *grid_size, int *block_size, size_t size, kernel k) { int min_grid_size; cudaOccupancyMaxPotentialBlockSize(&min_grid_size, block_size, k); CheckCUDAError("cudaOccupancyMaxPotentialBlockSize"); // find needed gridsize size_t needed_grid_size = (size + *block_size - 1) / *block_size; *grid_size = static_cast<int>(std::min(static_cast<size_t>(min_grid_size), needed_grid_size)); } // Set of atomicAdd operator wrappers. // In device mode, they will call cuda atomicX. // In host mode, they will just do the addition without atomicity guarantee as // std::atomic protects on memory managed by itself instead of on a passed-in // address. This is ok for now since for host mode algorithms are not running // in parallel. // TODO(lucafuji): find atomic libraries on host. #ifdef RUN_ON_DEVICE template <typename val_type> __host__ __device__ inline val_type atomicAdd(val_type* address, val_type val) { return ::atomicAdd(address, val); } #else template <typename val_type> __host__ __device__ inline val_type atomicAdd(val_type* address, val_type val) { val_type old = *address; *address += val; return old; } #endif // GPU memory access has to be aligned to 1, 2, 4, 8, 16 bytes // http://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#device-memory-accesses // therefore we do byte to byte comparison here inline __host__ __device__ bool memequal(const uint8_t *lhs, const uint8_t *rhs, int bytes) { for (int i = 0; i < bytes; i++) { if (lhs[i] != rhs[i]) { return false; } } return true; } template<typename t1, typename t2> __host__ __device__ thrust::pair<t1, t2> tuple2pair(thrust::tuple<t1, t2> t) { return thrust::make_pair(thrust::get<0>(t), thrust::get<1>(t)); } __host__ __device__ uint32_t murmur3sum32(const uint8_t *key, int bytes, uint32_t seed); __host__ __device__ void murmur3sum128(const uint8_t *key, int len, uint32_t seed, uint64_t *out); template<int hash_bytes = 64> struct hash_output_type { using type = uint64_t; }; template<> struct hash_output_type<32> { using type = uint32_t; }; template<int hash_bytes = 64> __host__ __device__ inline typename hash_output_type<hash_bytes>::type murmur3sum(const uint8_t *key, int bytes, uint32_t seed) { uint64_t hashedOutput[2]; murmur3sum128(key, bytes, seed, hashedOutput); return hashedOutput[0]; } template<> __host__ __device__ inline typename hash_output_type<32>::type murmur3sum<32>(const uint8_t *key, int bytes, uint32_t seed) { return murmur3sum32(key, bytes, seed); } } // namespace ares #endif // QUERY_UTILS_HPP_