jupyter/comparison-to-datasketch/cardinality_error_experiment.py (114 lines of code) (raw):

#!/usr/bin/env python # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import sys import argparse from datetime import datetime import pandas as pd import numpy as np from utils import distinct_number_sequence import datasketches as ds import datasketch as d import mmh3 import os import matplotlib.pyplot as plt class ErrorCardinalityProfile: """Generates an experiment comparing the error over different cardinalities""" def __init__(self, sketch_lgk:int, lg_trials:int, max_lgN:int): self.sketch_lgk = sketch_lgk self.num_trials = 2**lg_trials self.max_lgN = max_lgN self.max_num_distincts = np.uint64(2 ** self.max_lgN) self.directory_name = "hll_accuracy_profile_" + datetime.today().strftime('%Y%m%d') if not os.path.exists(self.directory_name): os.mkdir(self.directory_name) self.file_extension = "_" + datetime.today().strftime('%H%M') + f"lgK_{self.sketch_lgk}_lgT_{lg_trials}" # Need to remove repeated items for the program logic in self.run() self.plot_points = self._generate_plot_points() self.plot_points.extend(self._generate_plot_points()) self.plot_points = list(set(self.plot_points)) self.plot_points.sort() print(self.plot_points) # Initialise the data structures for results self.DataSketches_results_arr = np.zeros((len(self.plot_points), self.num_trials), dtype=float) self.datasketch_results_arr = np.zeros_like(self.DataSketches_results_arr) self.DataSketches_results_df = pd.DataFrame(index=self.plot_points, columns=None) self.datasketch_results_df = pd.DataFrame(index=self.plot_points, columns=None) self.data = np.random.randn(len(self.plot_points), self.num_trials) print("Data shape: ", self.data.shape) def _generate_plot_points(self) -> list: """ Generates the standard sequence defining the input cardinalites for the experiment This is just two points at each power of 2 """ all_plot_points = [] for lgk in range(1, self.max_lgN+1): points = np.unique(np.logspace(start=lgk, stop=lgk+1, num=8, endpoint=False, base=2, dtype=np.uint64)) all_plot_points.extend(points) all_plot_points.sort() return all_plot_points def _bad_hll_range(self) -> list: """Generates 16 logspaced points in the n ≈ 2.5k range.""" log2_sketch_threshold = np.log2(2.5* (2**self.sketch_lgk)) start = np.floor(log2_sketch_threshold).astype(np.uint64) stop = np.ceil(log2_sketch_threshold).astype(np.uint64)+1 points = np.logspace(start, stop, num=50, endpoint=False, base=2, dtype=np.uint64)[1:] return points def _is_power_of_two(self, a): """Bitwise operations to check value a is a power of two""" return (a & (a-1) == 0) and a != 0 def _results_to_df(self, start_:int, end_:int, arr:np.array, df:pd.DataFrame): """Concatenates the array between columns start_,...end_ - 1 to the dataframe""" new_df = pd.DataFrame(arr[:, start_:end_], index=df.index, columns=np.arange(start_, end_).tolist()) print("concatenating: ", new_df) concat_df = pd.concat([df, new_df], axis=1) return concat_df def run(self): """Runs the experiment""" seq_start = np.uint64(2345234) distinct_number = np.uint64(3462) previous_log_trial_index = 0 ds_all_results = np.zeros((self.num_trials, len(self.plot_points))) d_all_results = np.zeros_like(ds_all_results) for trial in range(1, self.num_trials+1): #print(f"Trial = {trial}\t{self._is_power_of_two(trial)}") # Initialise the sketches hll = ds.hll_sketch(self.sketch_lgk, ds.HLL_8) h = d.HyperLogLogPlusPlus(p=self.sketch_lgk, hashfunc=lambda x: mmh3.hash64(x, signed=False)[0]) plot_point_index = 0 # Return to the start of the plot points list to generate the data plot_point_value = self.plot_points[plot_point_index] total_updates = 0 seq_start += distinct_number # Start a new input sequence # Temporary result data structure ds_results = np.zeros((len(self.plot_points),)) d_results = np.zeros_like(ds_results) for new_number in distinct_number_sequence(seq_start): hll.update(new_number) h.update(str(new_number).encode('utf8')) total_updates += 1 #print(f"Trial: {trial:<5} updates: {total_updates:<5}Index: {plot_point_index:<5} Value: {plot_point_value:<5}\n") if total_updates == plot_point_value: ds_results[plot_point_index] = (hll.get_estimate() - plot_point_value) / plot_point_value d_results[plot_point_index] = (h.count() - plot_point_value) / plot_point_value plot_point_index += 1 #print(f"PPI:{plot_point_index:<3}PPV:{plot_point_value}") if plot_point_index < len(self.plot_points): plot_point_value = self.plot_points[plot_point_index] else: #print("Already at the end") break # After the break statement, control returns here. Now need to decide whether to write or continue. ds_all_results[trial-1, :] = ds_results # subtract 1 as we use 1-based indexing for the trial count. d_all_results[trial - 1, :] = d_results # subtract 1 as we use 1-based indexing for the trial count. if self._is_power_of_two(trial) and trial > 1: # write the array only a logarithmic number of times temporary_ds_results = ds_all_results[0:trial, : ] temporary_d_results = d_all_results[0:trial, :] print(f"#################### PARTIAL RESULTS FOR {trial} TRIALS: DATASKETCHES ####################") previous_log_trial_index = trial self.DataSketches_results_df = pd.DataFrame(temporary_ds_results.T, index=self.DataSketches_results_df.index, columns=np.arange(trial).tolist()) self.DataSketches_results_df.to_csv( self.directory_name + "/DataSketches_hll" + self.file_extension + f"trials_{trial}.csv", index_label="n") self.datasketch_results_df = pd.DataFrame(temporary_d_results.T, index=self.datasketch_results_df.index, columns=np.arange(trial).tolist()) self.datasketch_results_df.to_csv( self.directory_name + "/datasketch_hll" + self.file_extension + f"trials_{trial}.csv", index_label="n" ) print(self.DataSketches_results_df) print(f"#################### PARTIAL RESULTS FOR {trial} TRIALS: datasketch ####################") print(self.datasketch_results_df) def main(argv): assert len(argv) == 3 SKETCH_LGK = argv['lgk'] LG_TRIALS = argv['lgt'] MAX_LG_N = argv['lgN'] # FOR SETTING NUMBER OF DISTINCTS experiment = ErrorCardinalityProfile(SKETCH_LGK, LG_TRIALS, MAX_LG_N) experiment.run() if __name__ == "__main__": parser = argparse.ArgumentParser(description='Error-cardinality profile of HyperLogLog') parser.add_argument('-lgk', '--lgk', help='Log2(k) value for the number of buckets in the sketch.',type=int, required=True) parser.add_argument('-lgt', '--lgt', help='Log2(T) value for number of trials t.',type=int, required=True) parser.add_argument('-lgN', '--lgN', help='Largest permissible log2(.) value for input size.',type=int, required=True) args = vars(parser.parse_args()) main(args)