jupyter/comparison-to-datasketch/cardinality_error_experiment.py (114 lines of code) (raw):
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import sys
import argparse
from datetime import datetime
import pandas as pd
import numpy as np
from utils import distinct_number_sequence
import datasketches as ds
import datasketch as d
import mmh3
import os
import matplotlib.pyplot as plt
class ErrorCardinalityProfile:
"""Generates an experiment comparing the error over different cardinalities"""
def __init__(self, sketch_lgk:int, lg_trials:int, max_lgN:int):
self.sketch_lgk = sketch_lgk
self.num_trials = 2**lg_trials
self.max_lgN = max_lgN
self.max_num_distincts = np.uint64(2 ** self.max_lgN)
self.directory_name = "hll_accuracy_profile_" + datetime.today().strftime('%Y%m%d')
if not os.path.exists(self.directory_name):
os.mkdir(self.directory_name)
self.file_extension = "_" + datetime.today().strftime('%H%M') + f"lgK_{self.sketch_lgk}_lgT_{lg_trials}"
# Need to remove repeated items for the program logic in self.run()
self.plot_points = self._generate_plot_points()
self.plot_points.extend(self._generate_plot_points())
self.plot_points = list(set(self.plot_points))
self.plot_points.sort()
print(self.plot_points)
# Initialise the data structures for results
self.DataSketches_results_arr = np.zeros((len(self.plot_points), self.num_trials), dtype=float)
self.datasketch_results_arr = np.zeros_like(self.DataSketches_results_arr)
self.DataSketches_results_df = pd.DataFrame(index=self.plot_points, columns=None)
self.datasketch_results_df = pd.DataFrame(index=self.plot_points, columns=None)
self.data = np.random.randn(len(self.plot_points), self.num_trials)
print("Data shape: ", self.data.shape)
def _generate_plot_points(self) -> list:
"""
Generates the standard sequence defining the input cardinalites for the experiment
This is just two points at each power of 2
"""
all_plot_points = []
for lgk in range(1, self.max_lgN+1):
points = np.unique(np.logspace(start=lgk, stop=lgk+1, num=8, endpoint=False, base=2, dtype=np.uint64))
all_plot_points.extend(points)
all_plot_points.sort()
return all_plot_points
def _bad_hll_range(self) -> list:
"""Generates 16 logspaced points in the n ≈ 2.5k range."""
log2_sketch_threshold = np.log2(2.5* (2**self.sketch_lgk))
start = np.floor(log2_sketch_threshold).astype(np.uint64)
stop = np.ceil(log2_sketch_threshold).astype(np.uint64)+1
points = np.logspace(start, stop, num=50, endpoint=False, base=2, dtype=np.uint64)[1:]
return points
def _is_power_of_two(self, a):
"""Bitwise operations to check value a is a power of two"""
return (a & (a-1) == 0) and a != 0
def _results_to_df(self, start_:int, end_:int, arr:np.array, df:pd.DataFrame):
"""Concatenates the array between columns start_,...end_ - 1 to the dataframe"""
new_df = pd.DataFrame(arr[:, start_:end_], index=df.index, columns=np.arange(start_, end_).tolist())
print("concatenating: ", new_df)
concat_df = pd.concat([df, new_df], axis=1)
return concat_df
def run(self):
"""Runs the experiment"""
seq_start = np.uint64(2345234)
distinct_number = np.uint64(3462)
previous_log_trial_index = 0
ds_all_results = np.zeros((self.num_trials, len(self.plot_points)))
d_all_results = np.zeros_like(ds_all_results)
for trial in range(1, self.num_trials+1):
#print(f"Trial = {trial}\t{self._is_power_of_two(trial)}")
# Initialise the sketches
hll = ds.hll_sketch(self.sketch_lgk, ds.HLL_8)
h = d.HyperLogLogPlusPlus(p=self.sketch_lgk, hashfunc=lambda x: mmh3.hash64(x, signed=False)[0])
plot_point_index = 0 # Return to the start of the plot points list to generate the data
plot_point_value = self.plot_points[plot_point_index]
total_updates = 0
seq_start += distinct_number # Start a new input sequence
# Temporary result data structure
ds_results = np.zeros((len(self.plot_points),))
d_results = np.zeros_like(ds_results)
for new_number in distinct_number_sequence(seq_start):
hll.update(new_number)
h.update(str(new_number).encode('utf8'))
total_updates += 1
#print(f"Trial: {trial:<5} updates: {total_updates:<5}Index: {plot_point_index:<5} Value: {plot_point_value:<5}\n")
if total_updates == plot_point_value:
ds_results[plot_point_index] = (hll.get_estimate() - plot_point_value) / plot_point_value
d_results[plot_point_index] = (h.count() - plot_point_value) / plot_point_value
plot_point_index += 1
#print(f"PPI:{plot_point_index:<3}PPV:{plot_point_value}")
if plot_point_index < len(self.plot_points):
plot_point_value = self.plot_points[plot_point_index]
else:
#print("Already at the end")
break
# After the break statement, control returns here. Now need to decide whether to write or continue.
ds_all_results[trial-1, :] = ds_results # subtract 1 as we use 1-based indexing for the trial count.
d_all_results[trial - 1, :] = d_results # subtract 1 as we use 1-based indexing for the trial count.
if self._is_power_of_two(trial) and trial > 1:
# write the array only a logarithmic number of times
temporary_ds_results = ds_all_results[0:trial, : ]
temporary_d_results = d_all_results[0:trial, :]
print(f"#################### PARTIAL RESULTS FOR {trial} TRIALS: DATASKETCHES ####################")
previous_log_trial_index = trial
self.DataSketches_results_df = pd.DataFrame(temporary_ds_results.T, index=self.DataSketches_results_df.index, columns=np.arange(trial).tolist())
self.DataSketches_results_df.to_csv(
self.directory_name + "/DataSketches_hll" + self.file_extension + f"trials_{trial}.csv",
index_label="n")
self.datasketch_results_df = pd.DataFrame(temporary_d_results.T,
index=self.datasketch_results_df.index,
columns=np.arange(trial).tolist())
self.datasketch_results_df.to_csv(
self.directory_name + "/datasketch_hll" + self.file_extension + f"trials_{trial}.csv",
index_label="n"
)
print(self.DataSketches_results_df)
print(f"#################### PARTIAL RESULTS FOR {trial} TRIALS: datasketch ####################")
print(self.datasketch_results_df)
def main(argv):
assert len(argv) == 3
SKETCH_LGK = argv['lgk']
LG_TRIALS = argv['lgt']
MAX_LG_N = argv['lgN'] # FOR SETTING NUMBER OF DISTINCTS
experiment = ErrorCardinalityProfile(SKETCH_LGK, LG_TRIALS, MAX_LG_N)
experiment.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Error-cardinality profile of HyperLogLog')
parser.add_argument('-lgk', '--lgk', help='Log2(k) value for the number of buckets in the sketch.',type=int, required=True)
parser.add_argument('-lgt', '--lgt', help='Log2(T) value for number of trials t.',type=int, required=True)
parser.add_argument('-lgN', '--lgN', help='Largest permissible log2(.) value for input size.',type=int, required=True)
args = vars(parser.parse_args())
main(args)