sketches/Sketches.py (176 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sketches.Sampler as Sampler
from collections.abc import Mapping, Container
from sys import getsizeof
import numpy as np
import random
def prefixDict(d, prefix=""):
if not prefix:
return d
return dict( (f"{prefix}{k}", v) for k, v in d.items())
class Sketch:
name = "BaseSketch"
is_random = True
params = None
def add(self, x, **kwargs):
raise Exception
def query(self, q=None, parameters=None):
raise Exception
def getSize(self):
return 0
def getSizeBytes(self):
# should be a self reported measurement of size
return sys.getsizeof(self)
def info(self, prefix="sketch_"):
"""
Put stats that should go into dataframe here
"""
stats = {
"name": self.name,
"size": self.getSize()
}
return prefixDict(stats, prefix)
class SketchFactory:
"""
This factory method helps the sketch creation function get pickled properly
It takes 1) a sketch class and 2) arguments for creating a sketch and binds the arguments to yield
a function that creates a sketch with the given arguments
It avoids problems with using lambda function to bind arguments where classes referenced inside the
lambda function are not serialized and have no external reference when deserialized
"""
def __init__(self, sketch_class, seed, *args, **kwargs):
self.sketch_class = sketch_class
self.args = args
self.kwargs = kwargs
self.seed = seed
def __call__(self, seed=None):
return self.sketch_class(*self.args, **self.kwargs, seed=seed)
##########################################################################################
# for testing
class NoiseSketch(Sketch):
name = "NoiseSketch"
def add(self, x, **kwargs):
pass
def query(self, q, parameters=None):
return uniform(0,1)
class TopKSamplerSketch(Sketch):
def __init__(self, k, maxsize, seed=None):
self.topk_sampler = Sampler.TopKSampler(k, maxsize, seed=seed)
self.k = k
self.maxsize = maxsize
def add(self,x):
self.topk_sampler.add(x, 1)
def query(self, q=None, parameters=None):
"""
return topK item, cnt pairs
"""
arr = [(topk_item.nhat(), topk_item.item) for topk_item in self.topk_sampler.buffer]
arr.sort(reverse = True)
return [(x,nhat) for nhat, x in arr[:min(self.k,len(arr))]]
def getSizeBytes(self):
return sum([sys.getsizeof(x) for x in self.topk_sampler.buffer])
def getSize(self):
return len(self.topk_sampler.buffer)
def info(self, prefix="sketch_"):
info = super().info(prefix=None)
info["seed"] = self.topk_sampler.seed
info['maxsize'] = self.maxsize
info['k'] = self.k
return prefixDict(info, prefix=prefix)
########################################################################
from datasketches import frequent_strings_sketch, frequent_items_error_type
from math import log2,ceil
class FrequentItemsSketch(Sketch):
is_random = False # deterministic sketch
cast_type = str
def __init__(self, k, maxsize, cast_type=str, seed=None):
self.k = k
self.maxsize = maxsize
v = int(ceil(log2(maxsize)))
self.sketch = frequent_strings_sketch(v)
self.cast_type= cast_type
def add(self, x, w=1):
self.sketch.update(str(x), w)
def query(self, q=None, parameters=None):
items = self.sketch.get_frequent_items(frequent_items_error_type.NO_FALSE_NEGATIVES)
result = [(self.cast_type(x), w) for x, lb, ub, w in items]
return result[:self.k]
def getSizeBytes(self):
return self.sketch.get_serialized_size_bytes()
# Not sure this is right
def getSize(self):
return self.sketch.get_num_active_items()
def info(self, prefix="sketch_"):
info = super().info(prefix=None)
info['bytes'] = self.getSizeBytes()
info['maxsize'] = self.maxsize
info['k'] = self.k
return prefixDict(info, prefix=prefix)
class OracleSketch(Sketch):
"""
This turns an oracle into an "exact" sketch
This would be more useful if the sketch sizes could be calculated here
"""
def __init__(self, oracle, seed=None):
self.oracle = oracle
def add(self, x, w=1):
self.oracle.add(x)
def query(self, q=None, parameters=None):
return self.oracle.query(None, q, parameters)
#####################################################
# quantile sketches
from datasketches import kll_floats_sketch, req_floats_sketch
from tdigest import TDigest
class KLLSketch(Sketch):
name = 'KLL'
def __init__(self, size=200, seed=None):
# if seed is not None:
# print("KLL sketch does not support seeding rng")
self.sketch = kll_floats_sketch(size)
def add(self, x):
self.sketch.update(x)
def query(self, q=None, parameters=None):
if q == 'quantile':
return self.sketch.get_quantile(parameters)
else:
return self.sketch.get_cdf([parameters])[0]
def getSizeBytes(self):
return self.sketch.get_num_retained()*8
def getSize(self):
return self.sketch.get_num_retained()
def info(self, prefix="sketch_"):
info = super().info(prefix=None)
info['bytes'] = self.getSizeBytes()
info['seed'] = random.getrandbits(64) # not a real seed but shows that runs are random
return prefixDict(info, prefix=prefix)
class REQSketch(KLLSketch):
name = 'REQ'
def __init__(self, size=200, seed=None):
# if seed is not None:
# print("REQ sketch does not support seeding rng")
self.sketch = req_floats_sketch(size)
class TDigestSketch(KLLSketch):
name = 'TDigest'
def __init__(self, delta=0.01, seed=None):
self.sketch = TDigest(delta=delta)
def add(self, x):
self.sketch.update(x)
def query(self, q=None, parameters=None):
if q == 'quantile':
return self.sketch.percentile(parameters*100)
else:
return self.sketch.cdf(parameters)
def getSizeBytes(self):
return self.getSize()*16
def getSize(self):
centroids = self.sketch.centroids_to_list()
return len(centroids)
#####################################################
#
# distinct counting sketches
#
from datasketches import hll_sketch, cpc_sketch, update_theta_sketch
import mmh3
class ThetaSketch(Sketch):
name = 'Theta'
def __init__(self, lg_k, p, seed=0):
self.sketch = update_theta_sketch(lg_k, p, seed)
self.seed = seed
self.lg_k = lg_k
def salt(self, x):
if self.seed == 0:
return x
elif isinstance(x, int) or isinstance(x, float):
return mmh3.hash64(bytes(x))[0]
else:
return mmh3.hash64(x)[0]
def add(self, x):
self.sketch.update(self.salt(x))
def query(self, q=None, parameters=None):
return self.sketch.get_estimate()
def getSizeBytes(self):
return self.sketch.get_num_retained()*32
def getSize(self):
return self.sketch.get_num_retained()
def info(self, prefix="sketch_"):
info = super().info(prefix=None)
info['bytes'] = self.getSizeBytes()
info['seed'] = self.seed
return prefixDict(info, prefix=prefix)
class HLLSketch(ThetaSketch):
name = 'HLL'
def __init__(self, lg_k, seed=0):
self.sketch = hll_sketch(lg_k)
self.lg_k = lg_k
self.seed = seed
def getSizeBytes(self):
return self.sketch.get_updatable_serialization_bytes()
def getSize(self):
return int(2**self.lg_k)