theta/include/theta_sketch_impl.hpp (758 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef THETA_SKETCH_IMPL_HPP_ #define THETA_SKETCH_IMPL_HPP_ #include <sstream> #include <vector> #include <stdexcept> #include "binomial_bounds.hpp" #include "theta_helpers.hpp" #include "count_zeros.hpp" #include "bit_packing.hpp" #include "memory_operations.hpp" namespace datasketches { template<typename A> bool base_theta_sketch_alloc<A>::is_estimation_mode() const { return get_theta64() < theta_constants::MAX_THETA && !is_empty(); } template<typename A> double base_theta_sketch_alloc<A>::get_theta() const { return static_cast<double>(get_theta64()) / static_cast<double>(theta_constants::MAX_THETA); } template<typename A> double base_theta_sketch_alloc<A>::get_estimate() const { return get_num_retained() / get_theta(); } template<typename A> double base_theta_sketch_alloc<A>::get_lower_bound(uint8_t num_std_devs) const { if (!is_estimation_mode()) return get_num_retained(); return binomial_bounds::get_lower_bound(get_num_retained(), get_theta(), num_std_devs); } template<typename A> double base_theta_sketch_alloc<A>::get_upper_bound(uint8_t num_std_devs) const { if (!is_estimation_mode()) return get_num_retained(); return binomial_bounds::get_upper_bound(get_num_retained(), get_theta(), num_std_devs); } template<typename A> string<A> base_theta_sketch_alloc<A>::to_string(bool print_details) const { // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements. // The stream does not support passing an allocator instance, and alternatives are complicated. std::ostringstream os; os << "### Theta sketch summary:" << std::endl; os << " num retained entries : " << this->get_num_retained() << std::endl; os << " seed hash : " << this->get_seed_hash() << std::endl; os << " empty? : " << (this->is_empty() ? "true" : "false") << std::endl; os << " ordered? : " << (this->is_ordered() ? "true" : "false") << std::endl; os << " estimation mode? : " << (this->is_estimation_mode() ? "true" : "false") << std::endl; os << " theta (fraction) : " << this->get_theta() << std::endl; os << " theta (raw 64-bit) : " << this->get_theta64() << std::endl; os << " estimate : " << this->get_estimate() << std::endl; os << " lower bound 95% conf : " << this->get_lower_bound(2) << std::endl; os << " upper bound 95% conf : " << this->get_upper_bound(2) << std::endl; print_specifics(os); os << "### End sketch summary" << std::endl; if (print_details) { print_items(os); } return string<A>(os.str().c_str(), this->get_allocator()); } template<typename A> void theta_sketch_alloc<A>::print_items(std::ostringstream& os) const { os << "### Retained entries" << std::endl; for (const auto& hash: *this) { os << hash << std::endl; } os << "### End retained entries" << std::endl; } // update sketch template<typename A> update_theta_sketch_alloc<A>::update_theta_sketch_alloc(uint8_t lg_cur_size, uint8_t lg_nom_size, resize_factor rf, float p, uint64_t theta, uint64_t seed, const A& allocator): table_(lg_cur_size, lg_nom_size, rf, p, theta, seed, allocator) {} template<typename A> A update_theta_sketch_alloc<A>::get_allocator() const { return table_.allocator_; } template<typename A> bool update_theta_sketch_alloc<A>::is_empty() const { return table_.is_empty_; } template<typename A> bool update_theta_sketch_alloc<A>::is_ordered() const { return table_.num_entries_ > 1 ? false : true; } template<typename A> uint64_t update_theta_sketch_alloc<A>::get_theta64() const { return is_empty() ? theta_constants::MAX_THETA : table_.theta_; } template<typename A> uint32_t update_theta_sketch_alloc<A>::get_num_retained() const { return table_.num_entries_; } template<typename A> uint16_t update_theta_sketch_alloc<A>::get_seed_hash() const { return compute_seed_hash(table_.seed_); } template<typename A> uint8_t update_theta_sketch_alloc<A>::get_lg_k() const { return table_.lg_nom_size_; } template<typename A> auto update_theta_sketch_alloc<A>::get_rf() const -> resize_factor { return table_.rf_; } template<typename A> void update_theta_sketch_alloc<A>::update(uint64_t value) { update(&value, sizeof(value)); } template<typename A> void update_theta_sketch_alloc<A>::update(int64_t value) { update(&value, sizeof(value)); } template<typename A> void update_theta_sketch_alloc<A>::update(uint32_t value) { update(static_cast<int32_t>(value)); } template<typename A> void update_theta_sketch_alloc<A>::update(int32_t value) { update(static_cast<int64_t>(value)); } template<typename A> void update_theta_sketch_alloc<A>::update(uint16_t value) { update(static_cast<int16_t>(value)); } template<typename A> void update_theta_sketch_alloc<A>::update(int16_t value) { update(static_cast<int64_t>(value)); } template<typename A> void update_theta_sketch_alloc<A>::update(uint8_t value) { update(static_cast<int8_t>(value)); } template<typename A> void update_theta_sketch_alloc<A>::update(int8_t value) { update(static_cast<int64_t>(value)); } template<typename A> void update_theta_sketch_alloc<A>::update(double value) { update(canonical_double(value)); } template<typename A> void update_theta_sketch_alloc<A>::update(float value) { update(static_cast<double>(value)); } template<typename A> void update_theta_sketch_alloc<A>::update(const std::string& value) { if (value.empty()) return; update(value.c_str(), value.length()); } template<typename A> void update_theta_sketch_alloc<A>::update(const void* data, size_t length) { const uint64_t hash = table_.hash_and_screen(data, length); if (hash == 0) return; auto result = table_.find(hash); if (!result.second) { table_.insert(result.first, hash); } } template<typename A> void update_theta_sketch_alloc<A>::trim() { table_.trim(); } template<typename A> void update_theta_sketch_alloc<A>::reset() { table_.reset(); } template<typename A> auto update_theta_sketch_alloc<A>::begin() -> iterator { return iterator(table_.entries_, 1 << table_.lg_cur_size_, 0); } template<typename A> auto update_theta_sketch_alloc<A>::end() -> iterator { return iterator(nullptr, 0, 1 << table_.lg_cur_size_); } template<typename A> auto update_theta_sketch_alloc<A>::begin() const -> const_iterator { return const_iterator(table_.entries_, 1 << table_.lg_cur_size_, 0); } template<typename A> auto update_theta_sketch_alloc<A>::end() const -> const_iterator { return const_iterator(nullptr, 0, 1 << table_.lg_cur_size_); } template<typename A> compact_theta_sketch_alloc<A> update_theta_sketch_alloc<A>::compact(bool ordered) const { return compact_theta_sketch_alloc<A>(*this, ordered); } template<typename A> void update_theta_sketch_alloc<A>::print_specifics(std::ostringstream& os) const { os << " lg nominal size : " << static_cast<int>(table_.lg_nom_size_) << std::endl; os << " lg current size : " << static_cast<int>(table_.lg_cur_size_) << std::endl; os << " resize factor : " << (1 << table_.rf_) << std::endl; } // builder template<typename A> update_theta_sketch_alloc<A>::builder::builder(const A& allocator): theta_base_builder<builder, A>(allocator) {} template<typename A> update_theta_sketch_alloc<A> update_theta_sketch_alloc<A>::builder::build() const { return update_theta_sketch_alloc(this->starting_lg_size(), this->lg_k_, this->rf_, this->p_, this->starting_theta(), this->seed_, this->allocator_); } // compact sketch template<typename A> template<typename Other> compact_theta_sketch_alloc<A>::compact_theta_sketch_alloc(const Other& other, bool ordered): is_empty_(other.is_empty()), is_ordered_(other.is_ordered() || ordered), seed_hash_(other.get_seed_hash()), theta_(other.get_theta64()), entries_(other.get_allocator()) { if (!other.is_empty()) { entries_.reserve(other.get_num_retained()); std::copy(other.begin(), other.end(), std::back_inserter(entries_)); if (ordered && !other.is_ordered()) std::sort(entries_.begin(), entries_.end()); } } template<typename A> compact_theta_sketch_alloc<A>::compact_theta_sketch_alloc(bool is_empty, bool is_ordered, uint16_t seed_hash, uint64_t theta, std::vector<uint64_t, A>&& entries): is_empty_(is_empty), is_ordered_(is_ordered || (entries.size() <= 1ULL)), seed_hash_(seed_hash), theta_(theta), entries_(std::move(entries)) {} template<typename A> A compact_theta_sketch_alloc<A>::get_allocator() const { return entries_.get_allocator(); } template<typename A> bool compact_theta_sketch_alloc<A>::is_empty() const { return is_empty_; } template<typename A> bool compact_theta_sketch_alloc<A>::is_ordered() const { return is_ordered_; } template<typename A> uint64_t compact_theta_sketch_alloc<A>::get_theta64() const { return theta_; } template<typename A> uint32_t compact_theta_sketch_alloc<A>::get_num_retained() const { return static_cast<uint32_t>(entries_.size()); } template<typename A> uint16_t compact_theta_sketch_alloc<A>::get_seed_hash() const { return seed_hash_; } template<typename A> auto compact_theta_sketch_alloc<A>::begin() -> iterator { return iterator(entries_.data(), static_cast<uint32_t>(entries_.size()), 0); } template<typename A> auto compact_theta_sketch_alloc<A>::end() -> iterator { return iterator(nullptr, 0, static_cast<uint32_t>(entries_.size())); } template<typename A> auto compact_theta_sketch_alloc<A>::begin() const -> const_iterator { return const_iterator(entries_.data(), static_cast<uint32_t>(entries_.size()), 0); } template<typename A> auto compact_theta_sketch_alloc<A>::end() const -> const_iterator { return const_iterator(nullptr, 0, static_cast<uint32_t>(entries_.size())); } template<typename A> void compact_theta_sketch_alloc<A>::print_specifics(std::ostringstream&) const {} template<typename A> uint8_t compact_theta_sketch_alloc<A>::get_preamble_longs(bool compressed) const { if (compressed) { return this->is_estimation_mode() ? 2 : 1; } return this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2; } template<typename A> size_t compact_theta_sketch_alloc<A>::get_max_serialized_size_bytes(uint8_t lg_k) { return sizeof(uint64_t) * (3 + update_theta_sketch_alloc<A>::theta_table::get_capacity(lg_k + 1, lg_k)); } template<typename A> size_t compact_theta_sketch_alloc<A>::get_serialized_size_bytes(bool compressed) const { if (compressed && is_suitable_for_compression()) { return get_compressed_serialized_size_bytes(compute_entry_bits(), get_num_entries_bytes()); } return sizeof(uint64_t) * get_preamble_longs(false) + sizeof(uint64_t) * entries_.size(); } // store num_entries as whole bytes since whole-byte blocks will follow (most probably) template<typename A> uint8_t compact_theta_sketch_alloc<A>::get_num_entries_bytes() const { return whole_bytes_to_hold_bits<uint8_t>(32 - count_leading_zeros_in_u32(static_cast<uint32_t>(entries_.size()))); } template<typename A> size_t compact_theta_sketch_alloc<A>::get_compressed_serialized_size_bytes(uint8_t entry_bits, uint8_t num_entries_bytes) const { const size_t compressed_bits = entry_bits * entries_.size(); return sizeof(uint64_t) * get_preamble_longs(true) + num_entries_bytes + whole_bytes_to_hold_bits(compressed_bits); } template<typename A> void compact_theta_sketch_alloc<A>::serialize(std::ostream& os) const { const uint8_t preamble_longs = this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2; write(os, preamble_longs); write(os, UNCOMPRESSED_SERIAL_VERSION); write(os, SKETCH_TYPE); write<uint16_t>(os, 0); // unused const uint8_t flags_byte( (1 << flags::IS_COMPACT) | (1 << flags::IS_READ_ONLY) | (this->is_empty() ? 1 << flags::IS_EMPTY : 0) | (this->is_ordered() ? 1 << flags::IS_ORDERED : 0) ); write(os, flags_byte); write(os, get_seed_hash()); if (preamble_longs > 1) { write(os, static_cast<uint32_t>(entries_.size())); write<uint32_t>(os, 0); // unused } if (this->is_estimation_mode()) write(os, this->theta_); if (entries_.size() > 0) write(os, entries_.data(), entries_.size() * sizeof(uint64_t)); } template<typename A> auto compact_theta_sketch_alloc<A>::serialize(unsigned header_size_bytes) const -> vector_bytes { const size_t size = get_serialized_size_bytes() + header_size_bytes; vector_bytes bytes(size, 0, entries_.get_allocator()); uint8_t* ptr = bytes.data() + header_size_bytes; const uint8_t preamble_longs = get_preamble_longs(false); *ptr++ = preamble_longs; *ptr++ = UNCOMPRESSED_SERIAL_VERSION; *ptr++ = SKETCH_TYPE; ptr += sizeof(uint16_t); // unused const uint8_t flags_byte( (1 << flags::IS_COMPACT) | (1 << flags::IS_READ_ONLY) | (this->is_empty() ? 1 << flags::IS_EMPTY : 0) | (this->is_ordered() ? 1 << flags::IS_ORDERED : 0) ); *ptr++ = flags_byte; ptr += copy_to_mem(get_seed_hash(), ptr); if (preamble_longs > 1) { ptr += copy_to_mem(static_cast<uint32_t>(entries_.size()), ptr); ptr += sizeof(uint32_t); // unused } if (this->is_estimation_mode()) ptr += copy_to_mem(theta_, ptr); if (entries_.size() > 0) ptr += copy_to_mem(entries_.data(), ptr, entries_.size() * sizeof(uint64_t)); return bytes; } template<typename A> bool compact_theta_sketch_alloc<A>::is_suitable_for_compression() const { if (!this->is_ordered() || entries_.size() == 0 || (entries_.size() == 1 && !this->is_estimation_mode())) return false; return true; } template<typename A> void compact_theta_sketch_alloc<A>::serialize_compressed(std::ostream& os) const { if (is_suitable_for_compression()) return serialize_version_4(os); return serialize(os); } template<typename A> auto compact_theta_sketch_alloc<A>::serialize_compressed(unsigned header_size_bytes) const -> vector_bytes { if (is_suitable_for_compression()) return serialize_version_4(header_size_bytes); return serialize(header_size_bytes); } template<typename A> uint8_t compact_theta_sketch_alloc<A>::compute_entry_bits() const { // compression is based on leading zeros in deltas between ordered hash values // assumes ordered sketch uint64_t previous = 0; uint64_t ored = 0; for (const uint64_t entry: entries_) { const uint64_t delta = entry - previous; ored |= delta; previous = entry; } return 64 - count_leading_zeros_in_u64(ored); } template<typename A> void compact_theta_sketch_alloc<A>::serialize_version_4(std::ostream& os) const { const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1; const uint8_t entry_bits = compute_entry_bits(); const uint8_t num_entries_bytes = get_num_entries_bytes(); write(os, preamble_longs); write(os, COMPRESSED_SERIAL_VERSION); write(os, SKETCH_TYPE); write(os, entry_bits); write(os, num_entries_bytes); const uint8_t flags_byte( (1 << flags::IS_COMPACT) | (1 << flags::IS_READ_ONLY) | (1 << flags::IS_ORDERED) ); write(os, flags_byte); write(os, get_seed_hash()); if (this->is_estimation_mode()) write(os, this->theta_); uint32_t num_entries = static_cast<uint32_t>(entries_.size()); for (unsigned i = 0; i < num_entries_bytes; ++i) { write<uint8_t>(os, num_entries & 0xff); num_entries >>= 8; } uint64_t previous = 0; uint64_t deltas[8]; vector_bytes buffer(entry_bits, 0, entries_.get_allocator()); // block of 8 entries takes entry_bits bytes // pack blocks of 8 deltas unsigned i; for (i = 0; i + 7 < entries_.size(); i += 8) { for (unsigned j = 0; j < 8; ++j) { deltas[j] = entries_[i + j] - previous; previous = entries_[i + j]; } pack_bits_block8(deltas, buffer.data(), entry_bits); write(os, buffer.data(), buffer.size()); } // pack extra deltas if fewer than 8 of them left if (i < entries_.size()) { uint8_t offset = 0; uint8_t* ptr = buffer.data(); for (; i < entries_.size(); ++i) { const uint64_t delta = entries_[i] - previous; previous = entries_[i]; offset = pack_bits(delta, entry_bits, ptr, offset); } if (offset > 0) ++ptr; write(os, buffer.data(), ptr - buffer.data()); } } template<typename A> auto compact_theta_sketch_alloc<A>::serialize_version_4(unsigned header_size_bytes) const -> vector_bytes { const uint8_t entry_bits = compute_entry_bits(); const uint8_t num_entries_bytes = get_num_entries_bytes(); const size_t size = get_compressed_serialized_size_bytes(entry_bits, num_entries_bytes) + header_size_bytes; vector_bytes bytes(size, 0, entries_.get_allocator()); uint8_t* ptr = bytes.data() + header_size_bytes; *ptr++ = get_preamble_longs(true); *ptr++ = COMPRESSED_SERIAL_VERSION; *ptr++ = SKETCH_TYPE; *ptr++ = entry_bits; *ptr++ = num_entries_bytes; const uint8_t flags_byte( (1 << flags::IS_COMPACT) | (1 << flags::IS_READ_ONLY) | (1 << flags::IS_ORDERED) ); *ptr++ = flags_byte; ptr += copy_to_mem(get_seed_hash(), ptr); if (this->is_estimation_mode()) { ptr += copy_to_mem(theta_, ptr); } uint32_t num_entries = static_cast<uint32_t>(entries_.size()); for (unsigned i = 0; i < num_entries_bytes; ++i) { *ptr++ = num_entries & 0xff; num_entries >>= 8; } uint64_t previous = 0; uint64_t deltas[8]; // pack blocks of 8 deltas unsigned i; for (i = 0; i + 7 < entries_.size(); i += 8) { for (unsigned j = 0; j < 8; ++j) { deltas[j] = entries_[i + j] - previous; previous = entries_[i + j]; } pack_bits_block8(deltas, ptr, entry_bits); ptr += entry_bits; } // pack extra deltas if fewer than 8 of them left uint8_t offset = 0; for (; i < entries_.size(); ++i) { const uint64_t delta = entries_[i] - previous; previous = entries_[i]; offset = pack_bits(delta, entry_bits, ptr, offset); } return bytes; } template<typename A> compact_theta_sketch_alloc<A> compact_theta_sketch_alloc<A>::deserialize(std::istream& is, uint64_t seed, const A& allocator) { const auto preamble_longs = read<uint8_t>(is); const auto serial_version = read<uint8_t>(is); const auto type = read<uint8_t>(is); checker<true>::check_sketch_type(type, SKETCH_TYPE); switch (serial_version) { case 4: return deserialize_v4(preamble_longs, is, seed, allocator); case 3: return deserialize_v3(preamble_longs, is, seed, allocator); case 1: return deserialize_v1(preamble_longs, is, seed, allocator); case 2: return deserialize_v2(preamble_longs, is, seed, allocator); default: throw std::invalid_argument("unexpected sketch serialization version " + std::to_string(serial_version)); } } template<typename A> compact_theta_sketch_alloc<A> compact_theta_sketch_alloc<A>::deserialize_v1( uint8_t, std::istream& is, uint64_t seed, const A& allocator) { const auto seed_hash = compute_seed_hash(seed); read<uint8_t>(is); // unused read<uint32_t>(is); // unused const auto num_entries = read<uint32_t>(is); read<uint32_t>(is); //unused const auto theta = read<uint64_t>(is); std::vector<uint64_t, A> entries(num_entries, 0, allocator); bool is_empty = (num_entries == 0) && (theta == theta_constants::MAX_THETA); if (!is_empty) read(is, entries.data(), sizeof(uint64_t) * entries.size()); if (!is.good()) throw std::runtime_error("error reading from std::istream"); return compact_theta_sketch_alloc(is_empty, true, seed_hash, theta, std::move(entries)); } template<typename A> compact_theta_sketch_alloc<A> compact_theta_sketch_alloc<A>::deserialize_v2( uint8_t preamble_longs, std::istream& is, uint64_t seed, const A& allocator) { read<uint8_t>(is); // unused read<uint16_t>(is); // unused const uint16_t seed_hash = read<uint16_t>(is); checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed)); if (preamble_longs == 1) { if (!is.good()) throw std::runtime_error("error reading from std::istream"); std::vector<uint64_t, A> entries(0, 0, allocator); return compact_theta_sketch_alloc(true, true, seed_hash, theta_constants::MAX_THETA, std::move(entries)); } else if (preamble_longs == 2) { const uint32_t num_entries = read<uint32_t>(is); read<uint32_t>(is); // unused std::vector<uint64_t, A> entries(num_entries, 0, allocator); if (num_entries == 0) { return compact_theta_sketch_alloc(true, true, seed_hash, theta_constants::MAX_THETA, std::move(entries)); } read(is, entries.data(), entries.size() * sizeof(uint64_t)); if (!is.good()) throw std::runtime_error("error reading from std::istream"); return compact_theta_sketch_alloc(false, true, seed_hash, theta_constants::MAX_THETA, std::move(entries)); } else if (preamble_longs == 3) { const uint32_t num_entries = read<uint32_t>(is); read<uint32_t>(is); // unused const auto theta = read<uint64_t>(is); bool is_empty = (num_entries == 0) && (theta == theta_constants::MAX_THETA); std::vector<uint64_t, A> entries(num_entries, 0, allocator); if (is_empty) { if (!is.good()) throw std::runtime_error("error reading from std::istream"); return compact_theta_sketch_alloc(true, true, seed_hash, theta, std::move(entries)); } else { read(is, entries.data(), sizeof(uint64_t) * entries.size()); if (!is.good()) throw std::runtime_error("error reading from std::istream"); return compact_theta_sketch_alloc(false, true, seed_hash, theta, std::move(entries)); } } else { throw std::invalid_argument(std::to_string(preamble_longs) + " longs of premable, but expected 1, 2, or 3"); } } template<typename A> compact_theta_sketch_alloc<A> compact_theta_sketch_alloc<A>::deserialize_v3( uint8_t preamble_longs, std::istream& is, uint64_t seed, const A& allocator) { read<uint16_t>(is); // unused const auto flags_byte = read<uint8_t>(is); const auto seed_hash = read<uint16_t>(is); const bool is_empty = flags_byte & (1 << flags::IS_EMPTY); if (!is_empty) checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed)); uint64_t theta = theta_constants::MAX_THETA; uint32_t num_entries = 0; if (!is_empty) { if (preamble_longs == 1) { num_entries = 1; } else { num_entries = read<uint32_t>(is); read<uint32_t>(is); // unused if (preamble_longs > 2) theta = read<uint64_t>(is); } } std::vector<uint64_t, A> entries(num_entries, 0, allocator); if (!is_empty) read(is, entries.data(), sizeof(uint64_t) * entries.size()); const bool is_ordered = flags_byte & (1 << flags::IS_ORDERED); if (!is.good()) throw std::runtime_error("error reading from std::istream"); return compact_theta_sketch_alloc(is_empty, is_ordered, seed_hash, theta, std::move(entries)); } template<typename A> compact_theta_sketch_alloc<A> compact_theta_sketch_alloc<A>::deserialize_v4( uint8_t preamble_longs, std::istream& is, uint64_t seed, const A& allocator) { const auto entry_bits = read<uint8_t>(is); const auto num_entries_bytes = read<uint8_t>(is); const auto flags_byte = read<uint8_t>(is); const auto seed_hash = read<uint16_t>(is); const bool is_empty = flags_byte & (1 << flags::IS_EMPTY); if (!is_empty) checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed)); uint64_t theta = theta_constants::MAX_THETA; if (preamble_longs > 1) theta = read<uint64_t>(is); uint32_t num_entries = 0; for (unsigned i = 0; i < num_entries_bytes; ++i) { num_entries |= read<uint8_t>(is) << (i << 3); } vector_bytes buffer(entry_bits, 0, allocator); // block of 8 entries takes entry_bits bytes std::vector<uint64_t, A> entries(num_entries, 0, allocator); // unpack blocks of 8 deltas unsigned i; for (i = 0; i + 7 < num_entries; i += 8) { read(is, buffer.data(), buffer.size()); unpack_bits_block8(&entries[i], buffer.data(), entry_bits); } // unpack extra deltas if fewer than 8 of them left if (i < num_entries) read(is, buffer.data(), whole_bytes_to_hold_bits((num_entries - i) * entry_bits)); if (!is.good()) throw std::runtime_error("error reading from std::istream"); const uint8_t* ptr = buffer.data(); uint8_t offset = 0; for (; i < num_entries; ++i) { offset = unpack_bits(entries[i], entry_bits, ptr, offset); } // undo deltas uint64_t previous = 0; for (i = 0; i < num_entries; ++i) { entries[i] += previous; previous = entries[i]; } const bool is_ordered = flags_byte & (1 << flags::IS_ORDERED); return compact_theta_sketch_alloc(is_empty, is_ordered, seed_hash, theta, std::move(entries)); } template<typename A> compact_theta_sketch_alloc<A> compact_theta_sketch_alloc<A>::deserialize(const void* bytes, size_t size, uint64_t seed, const A& allocator) { auto data = compact_theta_sketch_parser<true>::parse(bytes, size, seed, false); if (data.entry_bits == 64) { // versions 1 to 3 const uint64_t* entries = reinterpret_cast<const uint64_t*>(data.entries_start_ptr); return compact_theta_sketch_alloc(data.is_empty, data.is_ordered, data.seed_hash, data.theta, std::vector<uint64_t, A>(entries, entries + data.num_entries, allocator)); } else { // version 4 std::vector<uint64_t, A> entries(data.num_entries, 0, allocator); const uint8_t* ptr = reinterpret_cast<const uint8_t*>(data.entries_start_ptr); // unpack blocks of 8 deltas unsigned i; for (i = 0; i + 7 < data.num_entries; i += 8) { unpack_bits_block8(&entries[i], ptr, data.entry_bits); ptr += data.entry_bits; } // unpack extra deltas if fewer than 8 of them left uint8_t offset = 0; for (; i < data.num_entries; ++i) { offset = unpack_bits(entries[i], data.entry_bits, ptr, offset); } // undo deltas uint64_t previous = 0; for (i = 0; i < data.num_entries; ++i) { entries[i] += previous; previous = entries[i]; } return compact_theta_sketch_alloc(data.is_empty, data.is_ordered, data.seed_hash, data.theta, std::move(entries)); } } // wrapped compact sketch template<typename A> wrapped_compact_theta_sketch_alloc<A>::wrapped_compact_theta_sketch_alloc(const data_type& data): data_(data) {} template<typename A> const wrapped_compact_theta_sketch_alloc<A> wrapped_compact_theta_sketch_alloc<A>::wrap(const void* bytes, size_t size, uint64_t seed, bool dump_on_error) { return wrapped_compact_theta_sketch_alloc(compact_theta_sketch_parser<true>::parse(bytes, size, seed, dump_on_error)); } template<typename A> A wrapped_compact_theta_sketch_alloc<A>::get_allocator() const { return A(); } template<typename A> bool wrapped_compact_theta_sketch_alloc<A>::is_empty() const { return data_.is_empty; } template<typename A> bool wrapped_compact_theta_sketch_alloc<A>::is_ordered() const { return data_.is_ordered; } template<typename A> uint64_t wrapped_compact_theta_sketch_alloc<A>::get_theta64() const { return data_.theta; } template<typename A> uint32_t wrapped_compact_theta_sketch_alloc<A>::get_num_retained() const { return data_.num_entries; } template<typename A> uint16_t wrapped_compact_theta_sketch_alloc<A>::get_seed_hash() const { return data_.seed_hash; } template<typename A> auto wrapped_compact_theta_sketch_alloc<A>::begin() const -> const_iterator { return const_iterator(data_.entries_start_ptr, data_.entry_bits, data_.num_entries, 0); } template<typename A> auto wrapped_compact_theta_sketch_alloc<A>::end() const -> const_iterator { return const_iterator(data_.entries_start_ptr, data_.entry_bits, data_.num_entries, data_.num_entries); } template<typename A> void wrapped_compact_theta_sketch_alloc<A>::print_specifics(std::ostringstream&) const {} template<typename A> void wrapped_compact_theta_sketch_alloc<A>::print_items(std::ostringstream& os) const { os << "### Retained entries" << std::endl; for (const auto hash: *this) { os << hash << std::endl; } os << "### End retained entries" << std::endl; } // assumes index == 0 or index == num_entries template<typename Allocator> wrapped_compact_theta_sketch_alloc<Allocator>::const_iterator::const_iterator( const void* ptr, uint8_t entry_bits, uint32_t num_entries, uint32_t index): ptr_(ptr), entry_bits_(entry_bits), num_entries_(num_entries), index_(index), previous_(0), is_block_mode_(num_entries_ >= 8), offset_(0) { if (entry_bits == 64) { // no compression ptr_ = reinterpret_cast<const uint64_t*>(ptr) + index; } else if (index < num_entries) { if (is_block_mode_) { unpack8(); } else { unpack1(); } } } template<typename Allocator> auto wrapped_compact_theta_sketch_alloc<Allocator>::const_iterator::operator++() -> const_iterator& { if (entry_bits_ == 64) { // no compression ptr_ = reinterpret_cast<const uint64_t*>(ptr_) + 1; return *this; } if (++index_ < num_entries_) { if (is_block_mode_) { if ((index_ & 7) == 0) { if (num_entries_ - index_ >= 8) { unpack8(); } else { is_block_mode_ = false; unpack1(); } } } else { unpack1(); } } return *this; } template<typename Allocator> void wrapped_compact_theta_sketch_alloc<Allocator>::const_iterator::unpack1() { const uint32_t i = index_ & 7; offset_ = unpack_bits(buffer_[i], entry_bits_, reinterpret_cast<const uint8_t*&>(ptr_), offset_); buffer_[i] += previous_; previous_ = buffer_[i]; } template<typename Allocator> void wrapped_compact_theta_sketch_alloc<Allocator>::const_iterator::unpack8() { unpack_bits_block8(buffer_, reinterpret_cast<const uint8_t*>(ptr_), entry_bits_); ptr_ = reinterpret_cast<const uint8_t*>(ptr_) + entry_bits_; for (int i = 0; i < 8; ++i) { buffer_[i] += previous_; previous_ = buffer_[i]; } } template<typename Allocator> auto wrapped_compact_theta_sketch_alloc<Allocator>::const_iterator::operator++(int) -> const_iterator { const_iterator tmp(*this); operator++(); return tmp; } template<typename Allocator> bool wrapped_compact_theta_sketch_alloc<Allocator>::const_iterator::operator!=(const const_iterator& other) const { if (entry_bits_ == 64) return ptr_ != other.ptr_; return index_ != other.index_; } template<typename Allocator> bool wrapped_compact_theta_sketch_alloc<Allocator>::const_iterator::operator==(const const_iterator& other) const { if (entry_bits_ == 64) return ptr_ == other.ptr_; return index_ == other.index_; } template<typename Allocator> auto wrapped_compact_theta_sketch_alloc<Allocator>::const_iterator::operator*() const -> reference { if (entry_bits_ == 64) return *reinterpret_cast<const uint64_t*>(ptr_); return buffer_[index_ & 7]; } template<typename Allocator> auto wrapped_compact_theta_sketch_alloc<Allocator>::const_iterator::operator->() const -> pointer { if (entry_bits_ == 64) return reinterpret_cast<const uint64_t*>(ptr_); return buffer_ + (index_ & 7); } } /* namespace datasketches */ #endif