velox/exec/HashTable.cpp (1,109 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "velox/exec/HashTable.h" #include "velox/common/base/Portability.h" #include "velox/common/base/SimdUtil.h" #include "velox/common/process/ProcessBase.h" #include "velox/exec/ContainerRowSerde.h" #include "velox/vector/VectorTypeUtils.h" namespace facebook::velox::exec { template <TypeKind Kind> static int32_t kindSize() { return sizeof(typename KindToFlatVector<Kind>::HashRowType); } static int32_t typeKindSize(TypeKind kind) { return VELOX_DYNAMIC_TYPE_DISPATCH(kindSize, kind); } template <bool ignoreNullKeys> HashTable<ignoreNullKeys>::HashTable( std::vector<std::unique_ptr<VectorHasher>>&& hashers, const std::vector<std::unique_ptr<Aggregate>>& aggregates, const std::vector<TypePtr>& dependentTypes, bool allowDuplicates, bool isJoinBuild, bool hasProbedFlag, memory::MappedMemory* mappedMemory) : BaseHashTable(std::move(hashers)), aggregates_(aggregates), isJoinBuild_(isJoinBuild) { std::vector<TypePtr> keys; for (auto& hasher : hashers_) { keys.push_back(hasher->type()); if (!VectorHasher::typeKindSupportsValueIds(hasher->typeKind())) { hashMode_ = HashMode::kHash; } } rows_ = std::make_unique<RowContainer>( keys, !ignoreNullKeys, aggregates, dependentTypes, allowDuplicates, isJoinBuild, hasProbedFlag, hashMode_ != HashMode::kHash, mappedMemory, ContainerRowSerde::instance()); nextOffset_ = rows_->nextOffset(); } class ProbeState { public: enum class Operation { kProbe, kInsert, kErase }; // Special tag for an erased entry. This counts as occupied for // probe and as empty for insert. If a tag word with empties gets an // erase, we make the erased tag empty. If the tag word getting the // erase has no empties, the erase is marked with a tombstone. A // probe always stops with a tag word with empties. Adding an empty // to a tag word with no empties would break probes that needed to // skip this tag word. This is standard practice for open addressing // hash tables. F14 has more sophistication in this but we do not // need it here since erase is very rare and is not expected to // change the load factor by much in the expected uses. static constexpr uint8_t kTombstoneTag = 0x7f; static constexpr int32_t kFullMask = 0xffff; static constexpr BaseHashTable::TagVector kTombstoneGroup = { 0x7f7f7f7f7f7f7f7fUL, 0x7f7f7f7f7f7f7f7fUL}; static constexpr BaseHashTable::TagVector kEmptyGroup = {0, 0}; static inline int32_t tagsByteOffset(uint64_t hash, uint64_t sizeMask) { return (hash & sizeMask) & ~(sizeof(BaseHashTable::TagVector) - 1); } int32_t row() const { return row_; } // Use one instruction to load 16 tags // Use another instruction to make 16 copies of the tag being searched for inline void preProbe(uint8_t* tags, uint64_t sizeMask, uint64_t hash, int32_t row) { row_ = row; tagIndex_ = tagsByteOffset(hash, sizeMask); tagsInTable_ = BaseHashTable::loadTags(tags, tagIndex_); auto tag = BaseHashTable::hashTag(hash); wantedTags_ = _mm_set1_epi8(tag); group_ = nullptr; indexInTags_ = kNotSet; } // Use one instruction to compare the tag being searched for to 16 tags // If there is a match, load corresponding data from the table template <Operation op = Operation::kProbe> inline void firstProbe(char** table, int32_t firstKey) { auto tagMatches = _mm_cmpeq_epi8(tagsInTable_, wantedTags_); hits_ = _mm_movemask_epi8(tagMatches); if (hits_) { loadNextHit<op>(table, firstKey); } } template <Operation op, typename Compare, typename Insert> inline char* FOLLY_NULLABLE fullProbe( uint8_t* tags, char** table, uint64_t sizeMask, int32_t firstKey, Compare compare, Insert insert, bool extraCheck = false) { if (group_ && compare(group_, row_)) { if (op == Operation::kErase) { eraseHit(tags); } return group_; } auto alreadyChecked = group_; if (extraCheck) { tagsInTable_ = BaseHashTable::loadTags(tags, tagIndex_); auto tagMatches = _mm_cmpeq_epi8(tagsInTable_, wantedTags_); hits_ = _mm_movemask_epi8(tagMatches); } int32_t insertTagIndex = -1; for (;;) { if (!hits_) { uint16_t empty = _mm_movemask_epi8(_mm_cmpeq_epi8(tagsInTable_, kEmptyGroup)) & kFullMask; if (empty) { if (op == Operation::kProbe) { return nullptr; } if (op == Operation::kErase) { VELOX_FAIL("Erasing non-existing entry"); } if (indexInTags_ != kNotSet) { // We came to the end of the probe without a hit. We replace the // first tombstone on the way. return insert(row_, insertTagIndex + indexInTags_); } auto pos = bits::getAndClearLastSetBit(empty); return insert(row_, tagIndex_ + pos); } else if (op == Operation::kInsert && indexInTags_ == kNotSet) { // We passed through a full group. uint16_t tombstones = _mm_movemask_epi8(_mm_cmpeq_epi8(tagsInTable_, kTombstoneGroup)) & kFullMask; if (tombstones) { insertTagIndex = tagIndex_; indexInTags_ = bits::getAndClearLastSetBit(tombstones); } } } else { loadNextHit<op>(table, firstKey); if (!(extraCheck && group_ == alreadyChecked) && compare(group_, row_)) { if (op == Operation::kErase) { eraseHit(tags); } return group_; } continue; } tagIndex_ = (tagIndex_ + sizeof(BaseHashTable::TagVector)) & sizeMask; tagsInTable_ = BaseHashTable::loadTags(tags, tagIndex_); auto tagMatches = _mm_cmpeq_epi8(tagsInTable_, wantedTags_); hits_ = _mm_movemask_epi8(tagMatches) & kFullMask; } } private: static constexpr uint8_t kNotSet = 0xff; template <Operation op> inline void loadNextHit(char** table, int32_t firstKey) { int32_t hit = bits::getAndClearLastSetBit(hits_); if (op == Operation::kErase) { indexInTags_ = hit; } group_ = BaseHashTable::loadRow(table, tagIndex_ + hit); __builtin_prefetch(group_ + firstKey); } void eraseHit(uint8_t* tags) { auto empty = _mm_movemask_epi8(_mm_cmpeq_epi8(tagsInTable_, kEmptyGroup)); BaseHashTable::storeTag( tags, tagIndex_ + indexInTags_, empty ? 0 : kTombstoneTag); } char* group_; BaseHashTable::TagVector wantedTags_; BaseHashTable::TagVector tagsInTable_; int32_t row_; int32_t tagIndex_; BaseHashTable::MaskType hits_; // If op is kErase, this is the index of the current hit within the // group of 'tagIndex_'. If op is kInsert, this is the index of the // first tombstone in the group of 'insertTagIndex_'. Insert // replaces the first tombstone it finds. If it finds an empty // before finding a tombstone, it replaces the empty as soon as it // sees it. But the tombstone can be replaced only after finding an // empty and thus determining that the item being inserted is not in // the table. uint8_t indexInTags_ = kNotSet; }; template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::storeKeys( HashLookup& lookup, vector_size_t row) { for (int32_t i = 0; i < hashers_.size(); ++i) { auto& hasher = hashers_[i]; rows_->store(hasher->decodedVector(), row, lookup.hits[row], i); // NOLINT } } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::storeRowPointer( int32_t index, uint64_t hash, char* row) { if (hashMode_ != HashMode::kArray) { tags_[index] = hashTag(hash); } table_[index] = row; } template <bool ignoreNullKeys> char* HashTable<ignoreNullKeys>::insertEntry( HashLookup& lookup, int32_t index, vector_size_t row) { char* group = rows_->newRow(); lookup.hits[row] = group; // NOLINT storeKeys(lookup, row); storeRowPointer(index, lookup.hashes[row], group); if (hashMode_ == HashMode::kNormalizedKey) { // We store the unique digest of key values (normalized key) in // the word below the row. Space was reserved in the allocation // unless we have given up on normalized keys. RowContainer::normalizedKey(group) = lookup.normalizedKeys[row]; // NOLINT } ++numDistinct_; lookup.newGroups.push_back(row); return group; } template <bool ignoreNullKeys> bool HashTable<ignoreNullKeys>::compareKeys( const char* group, HashLookup& lookup, vector_size_t row) { int32_t numKeys = lookup.hashers.size(); // The loop runs at least once. Allow for first comparison to fail // before loop end check. int32_t i = 0; do { auto& hasher = lookup.hashers[i]; if (!rows_->equals<!ignoreNullKeys>( group, rows_->columnAt(i), hasher->decodedVector(), row)) { return false; } } while (++i < numKeys); return true; } template <bool ignoreNullKeys> bool HashTable<ignoreNullKeys>::compareKeys( const char* group, const char* inserted) { auto numKeys = hashers_.size(); int32_t i = 0; do { if (rows_->compare(group, inserted, i, CompareFlags{true, true})) { return false; } } while (++i < numKeys); return true; } template <bool ignoreNullKeys> template <bool isJoin> FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::fullProbe( HashLookup& lookup, ProbeState& state, bool extraCheck) { constexpr ProbeState::Operation op = isJoin ? ProbeState::Operation::kProbe : ProbeState::Operation::kInsert; if (hashMode_ == HashMode::kNormalizedKey) { // NOLINT lookup.hits[state.row()] = state.fullProbe<op>( tags_, table_, sizeMask_, -static_cast<int32_t>(sizeof(normalized_key_t)), [&](char* group, int32_t row) INLINE_LAMBDA { return RowContainer::normalizedKey(group) == lookup.normalizedKeys[row]; }, [&](int32_t index, int32_t row) { return isJoin ? nullptr : insertEntry(lookup, row, index); }, !isJoin && extraCheck); return; } // NOLINT lookup.hits[state.row()] = state.fullProbe<op>( tags_, table_, sizeMask_, 0, [&](char* group, int32_t row) { return compareKeys(group, lookup, row); }, [&](int32_t index, int32_t row) { return isJoin ? nullptr : insertEntry(lookup, row, index); }, !isJoin && extraCheck); } namespace { // A normalized key is spread evenly over 64 bits. This mixes the bits // so that they affect the low 'bits', which are actually used for // indexing the hash table. inline uint64_t mixNormalizedKey(uint64_t k, uint8_t bits) { constexpr uint64_t prime1 = 0xc6a4a7935bd1e995UL; // M from Murmurhash. constexpr uint64_t prime2 = 527729; constexpr uint64_t prime3 = 28047; auto h = (k ^ ((k >> 32))) * prime1; return h + (h >> bits) * prime2 + (h >> (2 * bits)) * prime3; } void populateNormalizedKeys(HashLookup& lookup, int8_t sizeBits) { lookup.normalizedKeys.resize(lookup.rows.back() + 1); auto hashes = lookup.hashes.data(); for (auto row : lookup.rows) { auto hash = hashes[row]; lookup.normalizedKeys[row] = hash; // NOLINT hashes[row] = mixNormalizedKey(hash, sizeBits); } } } // namespace template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::groupProbe(HashLookup& lookup) { if (hashMode_ == HashMode::kArray) { arrayGroupProbe(lookup); return; } // Do size-based rehash before mixing hashes from normalized keys // because the size of the table affects the mixing. checkSize(lookup.rows.size()); if (hashMode_ == HashMode::kNormalizedKey) { populateNormalizedKeys(lookup, sizeBits_); } ProbeState state1; ProbeState state2; ProbeState state3; ProbeState state4; int32_t probeIndex = 0; int32_t numProbes = lookup.rows.size(); auto rows = lookup.rows.data(); for (; probeIndex + 4 <= numProbes; probeIndex += 4) { int32_t row = rows[probeIndex]; state1.preProbe(tags_, sizeMask_, lookup.hashes[row], row); row = rows[probeIndex + 1]; state2.preProbe(tags_, sizeMask_, lookup.hashes[row], row); row = rows[probeIndex + 2]; state3.preProbe(tags_, sizeMask_, lookup.hashes[row], row); row = rows[probeIndex + 3]; state4.preProbe(tags_, sizeMask_, lookup.hashes[row], row); state1.firstProbe<ProbeState::Operation::kInsert>(table_, 0); state2.firstProbe<ProbeState::Operation::kInsert>(table_, 0); state3.firstProbe<ProbeState::Operation::kInsert>(table_, 0); state4.firstProbe<ProbeState::Operation::kInsert>(table_, 0); fullProbe<false>(lookup, state1, false); fullProbe<false>(lookup, state2, true); fullProbe<false>(lookup, state3, true); fullProbe<false>(lookup, state4, true); } for (; probeIndex < numProbes; ++probeIndex) { int32_t row = rows[probeIndex]; state1.preProbe(tags_, sizeMask_, lookup.hashes[row], row); state1.firstProbe(table_, 0); fullProbe<false>(lookup, state1, false); } initializeNewGroups(lookup); } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::arrayGroupProbe(HashLookup& lookup) { VELOX_DCHECK(!lookup.hashes.empty()); VELOX_DCHECK(!lookup.hits.empty()); int32_t numProbes = lookup.rows.size(); const vector_size_t* rows = lookup.rows.data(); auto hashes = lookup.hashes.data(); auto groups = lookup.hits.data(); int32_t i = 0; if (process::hasAvx2() && simd::isDense(rows, numProbes)) { auto allZero = simd::Vectors<int64_t>::setAll(0); constexpr int32_t kWidth = simd::Vectors<int64_t>::VSize; auto start = rows[0]; auto end = start + numProbes - kWidth; for (i = start; i <= end; i += kWidth) { auto loaded = simd::Vectors<int64_t>::gather64( table_, simd::Vectors<int64_t>::load(hashes + i)); *simd::Vectors<int64_t>::pointer(groups + i) = loaded; auto misses = simd::Vectors<int64_t>::compareBitMask( simd::Vectors<int64_t>::compareEq(loaded, allZero)); if (LIKELY(!misses)) { continue; } for (auto miss = 0; miss < kWidth; ++miss) { auto row = i + miss; if (!groups[row]) { auto index = hashes[row]; auto hit = table_[index]; if (!hit) { hit = insertEntry(lookup, index, row); } groups[row] = hit; } } } i -= start; } for (; i < numProbes; ++i) { auto row = rows[i]; uint64_t index = hashes[row]; VELOX_DCHECK(index < size_); char* group = table_[index]; if (UNLIKELY(!group)) { group = insertEntry(lookup, index, row); } groups[row] = group; lookup.hits[row] = group; // NOLINT } initializeNewGroups(lookup); } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::joinProbe(HashLookup& lookup) { if (hashMode_ == HashMode::kArray) { for (auto row : lookup.rows) { auto index = lookup.hashes[row]; DCHECK(index < size_); lookup.hits[row] = table_[index]; // NOLINT } return; } if (hashMode_ == HashMode::kNormalizedKey) { populateNormalizedKeys(lookup, sizeBits_); } int32_t probeIndex = 0; int32_t numProbes = lookup.rows.size(); const vector_size_t* rows = lookup.rows.data(); ProbeState state1; ProbeState state2; ProbeState state3; ProbeState state4; for (; probeIndex + 4 <= numProbes; probeIndex += 4) { int32_t row = rows[probeIndex]; state1.preProbe(tags_, sizeMask_, lookup.hashes[row], row); row = rows[probeIndex + 1]; state2.preProbe(tags_, sizeMask_, lookup.hashes[row], row); row = rows[probeIndex + 2]; state3.preProbe(tags_, sizeMask_, lookup.hashes[row], row); row = rows[probeIndex + 3]; state4.preProbe(tags_, sizeMask_, lookup.hashes[row], row); state1.firstProbe(table_, 0); state2.firstProbe(table_, 0); state3.firstProbe(table_, 0); state4.firstProbe(table_, 0); fullProbe<true>(lookup, state1, false); fullProbe<true>(lookup, state2, false); fullProbe<true>(lookup, state3, false); fullProbe<true>(lookup, state4, false); } for (; probeIndex < numProbes; ++probeIndex) { int32_t row = rows[probeIndex]; state1.preProbe(tags_, sizeMask_, lookup.hashes[row], row); state1.firstProbe(table_, 0); fullProbe<true>(lookup, state1, false); } } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::initializeNewGroups(HashLookup& lookup) { if (lookup.newGroups.empty()) { return; } for (auto& aggregate : aggregates_) { aggregate->initializeNewGroups(lookup.hits.data(), lookup.newGroups); } } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::allocateTables(uint64_t size) { VELOX_CHECK(bits::isPowerOfTwo(size), "Size is not a power of two: {}", size); if (size > 0) { size_ = size; sizeMask_ = size_ - 1; sizeBits_ = __builtin_popcountll(sizeMask_); constexpr auto kPageSize = memory::MappedMemory::kPageSize; // The total size is 9 bytes per slot, 8 in the pointers table and 1 in the // tags table. auto numPages = bits::roundUp(size * 9, kPageSize) / kPageSize; if (!rows_->mappedMemory()->allocateContiguous( numPages, nullptr, tableAllocation_)) { VELOX_FAIL("Could not allocate join/group by hash table"); } table_ = tableAllocation_.data<char*>(); tags_ = reinterpret_cast<uint8_t*>(table_ + size); memset(tags_, 0, size_); // Not strictly necessary to clear 'table_' but more debuggable. memset(table_, 0, size_ * sizeof(char*)); } } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::clear() { rows_->clear(); if (hashMode_ != HashMode::kArray && tags_) { memset(tags_, 0, size_); } if (table_) { memset(table_, 0, sizeof(char*) * size_); } numDistinct_ = 0; } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::checkSize(int32_t numNew) { if (!table_ || !size_) { // Initial guess of cardinality is double the first input batch or at // least 2K entries. // numDistinct_ is non-0 when switching from HashMode::kArray to regular // hashing. auto newSize = std::max( (uint64_t)2048, bits::nextPowerOfTwo(numNew * 2 + numDistinct_)); allocateTables(newSize); if (numDistinct_) { rehash(); } } else if (numNew + numDistinct_ > size_ - (size_ / 8)) { // This implements the F14 load factor: Resize if less than 1/8 unoccupied. auto newSize = bits::nextPowerOfTwo(size_ + numNew); allocateTables(newSize); rehash(); } } template <bool ignoreNullKeys> bool HashTable<ignoreNullKeys>::insertBatch( char** groups, int32_t numGroups, raw_vector<uint64_t>& hashes) { for (int32_t i = 0; i < hashers_.size(); ++i) { auto& hasher = hashers_[i]; if (hashMode_ == HashMode::kHash) { rows_->hash( i, folly::Range<char**>(groups, numGroups), i > 0, hashes.data()); } else { // Array or normalized key. auto column = rows_->columnAt(i); if (!hasher->computeValueIdsForRows( groups, numGroups, column.offset(), column.nullByte(), ignoreNullKeys ? 0 : column.nullMask(), hashes)) { // Must reconsider 'hashMode_' and start over. return false; } } } if (isJoinBuild_) { insertForJoin(groups, hashes.data(), numGroups); } else { insertForGroupBy(groups, hashes.data(), numGroups); } return true; } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::insertForGroupBy( char** groups, uint64_t* hashes, int32_t numGroups) { if (hashMode_ == HashMode::kArray) { for (auto i = 0; i < numGroups; ++i) { auto index = hashes[i]; VELOX_CHECK_LT(index, size_); VELOX_CHECK_NULL(table_[index]); table_[index] = groups[i]; } } else { if (hashMode_ == HashMode::kNormalizedKey) { for (int i = 0; i < numGroups; ++i) { auto hash = hashes[i]; // Write the normalized key below the row. RowContainer::normalizedKey(groups[i]) = hash; // Shuffle the bits im the normalized key. hashes[i] = mixNormalizedKey(hash, sizeBits_); } } for (int32_t i = 0; i < numGroups; ++i) { auto hash = hashes[i]; auto tagIndex = ProbeState::tagsByteOffset(hash, sizeMask_); auto tagsInTable = BaseHashTable::loadTags(tags_, tagIndex); for (;;) { MaskType free = ~_mm_movemask_epi8(tagsInTable) & ProbeState::kFullMask; if (free) { auto freeOffset = bits::getAndClearLastSetBit(free); storeRowPointer(tagIndex + freeOffset, hash, groups[i]); break; } tagIndex = (tagIndex + sizeof(TagVector)) & sizeMask_; tagsInTable = loadTags(tags_, tagIndex); } } } } template <bool ignoreNullKeys> bool HashTable<ignoreNullKeys>::arrayPushRow(char* row, int32_t index) { auto existing = table_[index]; if (nextOffset_) { nextRow(row) = existing; if (existing) { hasDuplicates_ = true; } } else if (existing) { // Semijoin or a known unique build side ignores a repeat of a key. return false; } table_[index] = row; return !existing; } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::pushNext(char* row, char* next) { if (nextOffset_) { hasDuplicates_ = true; auto previousNext = nextRow(row); nextRow(row) = next; nextRow(next) = previousNext; } } template <bool ignoreNullKeys> FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::buildFullProbe( ProbeState& state, uint64_t hash, char* inserted, bool extraCheck) { if (hashMode_ == HashMode::kNormalizedKey) { state.fullProbe<ProbeState::Operation::kInsert>( tags_, table_, sizeMask_, -static_cast<int32_t>(sizeof(normalized_key_t)), [&](char* group, int32_t /*row*/) { if (RowContainer::normalizedKey(group) == RowContainer::normalizedKey(inserted)) { if (nextOffset_) { pushNext(group, inserted); } return true; } return false; }, [&](int32_t /*row*/, int32_t index) { storeRowPointer(index, hash, inserted); return nullptr; }, extraCheck); } else { state.fullProbe<ProbeState::Operation::kInsert>( tags_, table_, sizeMask_, 0, [&](char* group, int32_t /*row*/) { if (compareKeys(group, inserted)) { if (nextOffset_) { pushNext(group, inserted); } return true; } return false; }, [&](int32_t /*row*/, int32_t index) { storeRowPointer(index, hash, inserted); return nullptr; }, extraCheck); } } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::insertForJoin( char** groups, uint64_t* hashes, int32_t numGroups) { if (hashMode_ == HashMode::kNormalizedKey) { // Write the normalized key below each row. The key is only known // at the time of insert, so cannot be filled in at the time of // accumulating the build rows. for (auto i = 0; i < numGroups; ++i) { RowContainer::normalizedKey(groups[i]) = hashes[i]; hashes[i] = mixNormalizedKey(hashes[i], sizeBits_); } } // The insertable rows are in the table, all get put in the hash // table or array. if (hashMode_ == HashMode::kArray) { for (auto i = 0; i < numGroups; ++i) { auto index = hashes[i]; VELOX_CHECK_LT(index, size_); arrayPushRow(groups[i], index); } return; } ProbeState state1; for (auto i = 0; i < numGroups; ++i) { state1.preProbe(tags_, sizeMask_, hashes[i], i); state1.firstProbe(table_, 0); buildFullProbe(state1, hashes[i], groups[i], i); } } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::rehash() { constexpr int32_t kHashBatchSize = 1024; // @lint-ignore CLANGTIDY raw_vector<uint64_t> hashes; hashes.resize(kHashBatchSize); char* groups[kHashBatchSize]; // A join build can have multiple payload tables. Loop over 'this' // and the possible other tables and put all the data in the table // of 'this'. for (int32_t i = 0; i <= otherTables_.size(); ++i) { RowContainerIterator iterator; int32_t numGroups; do { numGroups = (i == 0 ? this : otherTables_[i - 1].get()) ->rows() ->listRows(&iterator, kHashBatchSize, groups); if (!insertBatch(groups, numGroups, hashes)) { VELOX_CHECK(hashMode_ != HashMode::kHash); setHashMode(HashMode::kHash, 0); return; } } while (numGroups > 0); } } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::setHashMode(HashMode mode, int32_t numNew) { VELOX_CHECK(hashMode_ != HashMode::kHash); if (mode == HashMode::kArray) { auto bytes = size_ * sizeof(char*); constexpr auto kPageSize = memory::MappedMemory::kPageSize; auto numPages = bits::roundUp(bytes, kPageSize) / kPageSize; if (!rows_->mappedMemory()->allocateContiguous( numPages, nullptr, tableAllocation_)) { VELOX_FAIL("Could not allocate array for array mode hash table"); } table_ = tableAllocation_.data<char*>(); memset(table_, 0, bytes); hashMode_ = HashMode::kArray; rehash(); } else if (mode == HashMode::kHash) { hashMode_ = HashMode::kHash; for (auto& hasher : hashers_) { hasher->resetStats(); } rows_->disableNormalizedKeys(); size_ = 0; // Makes tables of the right size and rehashes. checkSize(numNew); } else if (mode == HashMode::kNormalizedKey) { hashMode_ = HashMode::kNormalizedKey; size_ = 0; // Makes tables of the right size and rehashes. checkSize(numNew); } } template <bool ignoreNullKeys> bool HashTable<ignoreNullKeys>::analyze() { constexpr int32_t kHashBatchSize = 1024; // @lint-ignore CLANGTIDY char* groups[kHashBatchSize]; RowContainerIterator iterator; int32_t numGroups; do { numGroups = rows_->listRows(&iterator, kHashBatchSize, groups); for (int32_t i = 0; i < hashers_.size(); ++i) { auto& hasher = hashers_[i]; if (!hasher->isRange()) { // A range mode hasher does not know distincts, so need to // look. A distinct mode one does know the range. A hash join // build is always analyzed. continue; } uint64_t rangeSize; uint64_t distinctSize; hasher->cardinality(rangeSize, distinctSize); if (distinctSize == VectorHasher::kRangeTooLarge && rangeSize == VectorHasher::kRangeTooLarge) { return false; } RowColumn column = rows_->columnAt(i); hasher->analyze( groups, numGroups, column.offset(), ignoreNullKeys ? 0 : column.nullByte(), ignoreNullKeys ? 0 : column.nullMask()); } } while (numGroups > 0); return true; } namespace { // Multiplies a * b and produces uint64_t max to denote overflow. If // either a or b is overflow, preserves overflow. inline uint64_t safeMul(uint64_t a, uint64_t b) { constexpr uint64_t kMax = std::numeric_limits<uint64_t>::max(); if (a == kMax || b == kMax) { return kMax; } uint64_t result; if (__builtin_mul_overflow(a, b, &result)) { return kMax; } return result; } } // namespace template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::enableRangeWhereCan( const std::vector<uint64_t>& rangeSizes, const std::vector<uint64_t>& distinctSizes, std::vector<bool>& useRange) { // Sort non-range keys by the cardinality increase going from distinct to // range. std::vector<size_t> indices(rangeSizes.size()); std::vector<uint64_t> rangeMultipliers( rangeSizes.size(), std::numeric_limits<uint64_t>::max()); for (auto i = 0; i < rangeSizes.size(); i++) { indices[i] = i; if (!useRange[i]) { rangeMultipliers[i] = rangeSizes[i] / distinctSizes[i]; } } std::sort(indices.begin(), indices.end(), [&](auto i, auto j) { return rangeMultipliers[i] < rangeMultipliers[j]; }); auto calculateNewMultipler = [&]() { uint64_t multipler = 1; for (auto i = 0; i < rangeSizes.size(); ++i) { auto kind = hashers_[i]->typeKind(); // NOLINT multipler = safeMul( multipler, addReserve(useRange[i] ? rangeSizes[i] : distinctSizes[i], kind)); } return multipler; }; // Switch distinct to range if the cardinality increase does not overflow // 64 bits. for (auto i = 0; i < rangeSizes.size(); ++i) { if (!useRange[indices[i]]) { useRange[indices[i]] = true; auto newProduct = calculateNewMultipler(); if (newProduct == VectorHasher::kRangeTooLarge) { useRange[indices[i]] = false; return; } } } } // Returns the total number of values to reserve given a distinct // count or a range size and a type. The maximum size is the // cardinality of the type + 1 to indicate null. template <bool ignoreNullKeys> int64_t HashTable<ignoreNullKeys>::addReserve(int64_t count, TypeKind kind) { if (isJoinBuild_) { return count; } if (count >= VectorHasher::kRangeTooLarge / 2) { return count; } auto range = count + (count / 2); switch (kind) { case TypeKind::BOOLEAN: return 3; case TypeKind::TINYINT: return std::min<int64_t>(range, 257); case TypeKind::SMALLINT: return std::min<int64_t>(range, 0x10001); case TypeKind::INTEGER: return std::min<int64_t>(range, 0x10000001); default: return range; } } template <bool ignoreNullKeys> uint64_t HashTable<ignoreNullKeys>::setHasherMode( const std::vector<std::unique_ptr<VectorHasher>>& hashers, const std::vector<bool>& useRange, const std::vector<uint64_t>& rangeSizes, const std::vector<uint64_t>& distinctSizes) { uint64_t multiplier = 1; for (int i = 0; i < hashers.size(); ++i) { auto kind = hashers[i]->typeKind(); multiplier = useRange.size() > i && useRange[i] ? hashers[i]->enableValueRange( multiplier, addReserve(rangeSizes[i], kind) - rangeSizes[i]) : hashers[i]->enableValueIds( multiplier, addReserve(distinctSizes[i], kind) - distinctSizes[i]); VELOX_CHECK_NE(multiplier, VectorHasher::kRangeTooLarge); } return multiplier; } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::clearUseRange(std::vector<bool>& useRange) { for (auto i = 0; i < hashers_.size(); ++i) { useRange[i] = hashers_[i]->typeKind() == TypeKind::BOOLEAN; } } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::decideHashMode(int32_t numNew) { std::vector<uint64_t> rangeSizes(hashers_.size()); std::vector<uint64_t> distinctSizes(hashers_.size()); std::vector<bool> useRange(hashers_.size()); uint64_t bestWithReserve = 1; uint64_t distinctsWithReserve = 1; uint64_t rangesWithReserve = 1; if (numDistinct_ && !isJoinBuild_) { if (!analyze()) { setHashMode(HashMode::kHash, numNew); return; } } for (int i = 0; i < hashers_.size(); ++i) { auto kind = hashers_[i]->typeKind(); hashers_[i]->cardinality(rangeSizes[i], distinctSizes[i]); distinctsWithReserve = safeMul(distinctsWithReserve, addReserve(distinctSizes[i], kind)); rangesWithReserve = safeMul(rangesWithReserve, addReserve(rangeSizes[i], kind)); if (distinctSizes[i] == VectorHasher::kRangeTooLarge && rangeSizes[i] != VectorHasher::kRangeTooLarge) { useRange[i] = true; bestWithReserve = safeMul(bestWithReserve, addReserve(rangeSizes[i], kind)); } else if ( rangeSizes[i] != VectorHasher::kRangeTooLarge && rangeSizes[i] <= distinctSizes[i] * 20) { useRange[i] = true; bestWithReserve = safeMul(bestWithReserve, addReserve(rangeSizes[i], kind)); } else { bestWithReserve = safeMul(bestWithReserve, addReserve(distinctSizes[i], kind)); } } if (rangesWithReserve < kArrayHashMaxSize) { std::fill(useRange.begin(), useRange.end(), true); size_ = setHasherMode(hashers_, useRange, rangeSizes, distinctSizes); setHashMode(HashMode::kArray, numNew); return; } if (bestWithReserve < kArrayHashMaxSize) { size_ = setHasherMode(hashers_, useRange, rangeSizes, distinctSizes); setHashMode(HashMode::kArray, numNew); return; } if (rangesWithReserve != VectorHasher::kRangeTooLarge) { std::fill(useRange.begin(), useRange.end(), true); setHasherMode(hashers_, useRange, rangeSizes, distinctSizes); setHashMode(HashMode::kNormalizedKey, numNew); return; } if (hashers_.size() == 1 && distinctsWithReserve > 10000) { // A single part group by that does not go by range or become an array does // not make sense as a normalized key unless it is very small. setHashMode(HashMode::kHash, numNew); return; } if (distinctsWithReserve < kArrayHashMaxSize) { clearUseRange(useRange); size_ = setHasherMode(hashers_, useRange, rangeSizes, distinctSizes); setHashMode(HashMode::kArray, numNew); return; } if (distinctsWithReserve == VectorHasher::kRangeTooLarge && rangesWithReserve == VectorHasher::kRangeTooLarge) { setHashMode(HashMode::kHash, numNew); return; } // The key concatenation fits in 64 bits. if (bestWithReserve != VectorHasher::kRangeTooLarge) { enableRangeWhereCan(rangeSizes, distinctSizes, useRange); setHasherMode(hashers_, useRange, rangeSizes, distinctSizes); } else { clearUseRange(useRange); } setHashMode(HashMode::kNormalizedKey, numNew); } template <bool ignoreNullKeys> std::string HashTable<ignoreNullKeys>::toString() { std::stringstream out; int32_t occupied = 0; if (table_ && tableAllocation_.data() && tableAllocation_.size()) { // 'size_' and 'table_' may not be set if initializing. uint64_t size = std::min<uint64_t>(tableAllocation_.size() / sizeof(char*), size_); for (int32_t i = 0; i < size; ++i) { occupied += table_[i] != nullptr; } } out << "[HashTable size: " << size_ << " occupied: " << occupied << "]"; if (!table_) { out << "(no table) "; } for (auto& hasher : hashers_) { out << hasher->toString(); } return out.str(); } namespace { bool mayUseValueIds(const BaseHashTable& table) { if (table.hashMode() == BaseHashTable::HashMode::kHash) { return false; } for (auto& hasher : table.hashers()) { if (!hasher->mayUseValueIds()) { return false; } } return true; } } // namespace template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::prepareJoinTable( std::vector<std::unique_ptr<BaseHashTable>> tables) { otherTables_.reserve(tables.size()); for (auto& table : tables) { otherTables_.emplace_back(std::unique_ptr<HashTable<ignoreNullKeys>>( dynamic_cast<HashTable<ignoreNullKeys>*>(table.release()))); } bool useValueIds = mayUseValueIds(*this); if (useValueIds) { for (auto& other : otherTables_) { if (!mayUseValueIds(*other)) { useValueIds = false; break; } } if (useValueIds) { for (auto& other : otherTables_) { for (auto i = 0; i < hashers_.size(); ++i) { hashers_[i]->merge(*other->hashers_[i]); if (!hashers_[i]->mayUseValueIds()) { useValueIds = false; break; } } if (!useValueIds) { break; } } } } numDistinct_ = rows()->numRows(); for (auto& other : otherTables_) { numDistinct_ += other->rows()->numRows(); } if (!useValueIds) { if (hashMode_ != HashMode::kHash) { setHashMode(HashMode::kHash, 0); } else { checkSize(0); } } else { decideHashMode(0); } } template <bool ignoreNullKeys> int32_t HashTable<ignoreNullKeys>::listJoinResults( JoinResultIterator& iter, bool includeMisses, folly::Range<vector_size_t*> inputRows, folly::Range<char**> hits) { VELOX_CHECK_LE(inputRows.size(), hits.size()); int numOut = 0; auto maxOut = inputRows.size(); while (iter.lastRowIndex < iter.rows->size()) { if (!iter.nextHit) { auto row = (*iter.rows)[iter.lastRowIndex]; iter.nextHit = (*iter.hits)[row]; // NOLINT if (!iter.nextHit) { ++iter.lastRowIndex; if (includeMisses) { inputRows[numOut] = row; // NOLINT hits[numOut] = nullptr; ++numOut; if (numOut >= maxOut) { return numOut; } } continue; } } while (iter.nextHit) { char* next = nullptr; if (nextOffset_) { next = nextRow(iter.nextHit); __builtin_prefetch(reinterpret_cast<char*>(next) + nextOffset_); } inputRows[numOut] = (*iter.rows)[iter.lastRowIndex]; // NOLINT hits[numOut] = iter.nextHit; ++numOut; iter.nextHit = next; if (!iter.nextHit) { ++iter.lastRowIndex; } if (numOut >= maxOut) { return numOut; } } } return numOut; } template <bool ignoreNullKeys> int32_t HashTable<ignoreNullKeys>::listNotProbedRows( NotProbedRowsIterator* iter, int32_t maxRows, uint64_t maxBytes, char** rows) { if (iter->hashTableIndex_ == -1) { auto numRows = rows_->listNotProbedRows( &iter->rowContainerIterator_, maxRows, maxBytes, rows); if (numRows) { return numRows; } iter->hashTableIndex_ = 0; iter->rowContainerIterator_.reset(); } while (iter->hashTableIndex_ < otherTables_.size()) { auto numRows = otherTables_[iter->hashTableIndex_]->rows()->listNotProbedRows( &iter->rowContainerIterator_, maxRows, maxBytes, rows); if (numRows) { return numRows; } ++iter->hashTableIndex_; iter->rowContainerIterator_.reset(); } return 0; } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::erase(folly::Range<char**> rows) { auto numRows = rows.size(); raw_vector<uint64_t> hashes; hashes.resize(numRows); for (int32_t i = 0; i < hashers_.size(); ++i) { auto& hasher = hashers_[i]; if (hashMode_ == HashMode::kHash) { rows_->hash(i, rows, i > 0, hashes.data()); } else { auto column = rows_->columnAt(i); if (!hasher->computeValueIdsForRows( rows.data(), numRows, column.offset(), column.nullByte(), ignoreNullKeys ? 0 : column.nullMask(), hashes)) { VELOX_FAIL("Value ids in erase must exist for all keys"); } } } eraseWithHashes(rows, hashes.data()); } template <bool ignoreNullKeys> void HashTable<ignoreNullKeys>::eraseWithHashes( folly::Range<char**> rows, uint64_t* hashes) { auto numRows = rows.size(); if (hashMode_ == HashMode::kArray) { for (auto i = 0; i < numRows; ++i) { DCHECK(hashes[i] < size_); table_[hashes[i]] = nullptr; } } else { if (hashMode_ == HashMode::kNormalizedKey) { for (auto i = 0; i < numRows; ++i) { hashes[i] = mixNormalizedKey(hashes[i], sizeBits_); } } ProbeState state; for (auto i = 0; i < numRows; ++i) { state.preProbe(tags_, sizeMask_, hashes[i], i); state.firstProbe<ProbeState::Operation::kErase>(table_, 0); state.fullProbe<ProbeState::Operation::kErase>( tags_, table_, sizeMask_, 0, [&](const char* group, int32_t row) { return rows[row] == group; }, [&](int32_t /*index*/, int32_t /*row*/) { return nullptr; }, false); } } numDistinct_ -= numRows; rows_->eraseRows(rows); } template class HashTable<true>; template class HashTable<false>; } // namespace facebook::velox::exec