sketches/Sampler.py (212 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import random
import numpy as np
from random import uniform
from sortedcontainers import SortedList
from collections import namedtuple
Sample = namedtuple('Sample', ['priority', 'weight', 'item'])
from numba import jit
class Sampler:
def __init__(self, seed=None):
self.buffer = SortedList()
self.threshold = float("inf")
self.processed = 0
self.seed = seed
self.rng = random.Random(seed)
def cdf(self, v, weight):
F = weight * v
return min(1.0, F)
def priority(self, item, weight):
return self.rng.uniform(0,1./weight)
def pop(self):
T, w, x = self.buffer.pop()
self.threshold = T
return T, w, x
def reduceSampleSize(self, k):
while len(self.buffer) > k:
self.pop()
def postAdd(self, item, weight):
pass
def add(self, item, weight):
self.processed += 1
R = self.priority(item, weight)
if R < self.threshold:
self.buffer.add(Sample(R, weight,item))
self.postAdd(item, weight)
def getThreshold(self, item, w):
return self.threshold
def items(self):
"""
Generator which returns item, pseudo-inclusion probability
"""
for R, w, x in self.buffer:
yield x, self.inc_probability(self.getThreshold(x, w), w)
def inc_probability(self, x, w):
return self.cdf(x, w)
##################################################################################################################
def filterSum(sampler, eval = lambda x: x, predicate = lambda x: True):
s = 0
var = 0
for x, pi in sampler.items():
if predicate(x):
#pi = sampler.inc_probability(x, w)
v = eval(x)
s += v / pi
var += v*v * (1.0-pi) / (pi*pi)
return s, var
class BottomKSampler(Sampler):
def __init__(self, k):
super().__init__()
self.k = k
def postAdd(self, item, weight):
self.reduceSampleSize(self.k)
#################################################################################################################
class SpaceBoundedSampler(Sampler):
def __init__(self, budget, len=len):
super().__init__()
self.budget = budget
self.size = 0
self.len = len
def pop(self):
T, w, x = super().pop()
self.size -= self.len(x)
def compact(self, budget):
while self.size > budget:
self.pop()
def postAdd(self, item, weight):
self.size += self.len(item)
self.compact(self.budget)
##################################################################################################################
import xxhash
from enum import Enum
BIGVAL64 = (2**64-1)
class MultiStratifiedSampler(Sampler):
"""
Only compact the sample if the total sample size gets too large
"""
UNKNOWN = 0
REMOVE_CANDIDATE = 1
NOT_CANDIDATE = 2
def __init__(self, num_objectives, target_size, slack=1.2, seed=None):
self.target_size = target_size
self.buffer = SortedList()
self.thresholds = [float("inf") for i in range(num_objectives)]
self.composite_threshold = float("inf")
self.min_size_per_objective = target_size+1
self.slack = slack
self.seed = seed
self.rng = self.Random(seed)
# use hash based
def item_rv(self, x):
# h = xxhash.xxh64(str(x))
# z = h.intdigest() / BIGVAL64
z = self.rng.uniform(0,1)
return z
def priority(self, U, x, weight):
return [U / w for w in weight]
def getThreshold(self, item, w):
return self.thresholds
def cdf(self, v, weight):
p = 0.
for x, w in zip(v, weight):
F = min(1.0, w * x)
p = max(p, F)
return p
def pop(self):
raise Exception
@classmethod
def lt(cls, priority, threshold):
for r, t in zip(priority, threshold):
if r < t:
return True
return False
def getSizePerObjective(self):
size_per_objective = [0]*len(self.thresholds)
for R, w, x in self.buffer:
#R = self.priority(U,x,w)
for i, (r, t) in enumerate(zip(R, self.thresholds)):
if r < t:
size_per_objective[i] += 1
return size_per_objective
def compact(self):
while len(self.buffer) > self.target_size * self.slack:
if self.min_size_per_objective > self.target_size:
self.min_size_per_objective = max(self.getSizePerObjective())
self.min_size_per_objective = int(self.min_size_per_objective / self.slack)
#print("resize per obj: ", self.min_size_per_objective, len(self.buffer))
self.compactToSize(self.min_size_per_objective)
#print(len(self.buffer))
def getScaledThresholds(self, min_size_per_objective):
num_objectives = len(self.thresholds)
scaled_thresholds = []
for i in range(num_objectives):
priorities = [R[i] for R, w, x in self.buffer]
priorities.sort()
scaled_thresholds.append( priorities[min_size_per_objective+1] )
return scaled_thresholds
def compactToSize(self, min_size_per_objective):
self.thresholds = self.getScaledThresholds(min_size_per_objective)
new_buffer = SortedList()
for s in self.buffer:
if self.lt(s.priority, self.thresholds):
new_buffer.add(s)
self.buffer = new_buffer
def add(self, item, weight):
U = self.item_rv(x)
R = self.priority(U, item, weight)
if self.lt(R, self.thresholds):
self.buffer.add(Sample(R, weight, item))
self.compact()
from math import sqrt
##################################################################################################################
#= namedtuple('TopKItem', ['priority', 'item', 'threshold', 'count'])
class TopKItem:
def __init__(self, priority, item, weight, threshold, count):
self.priority = priority
self.item = item
self.weight = weight
self.threshold = threshold
self.count = count
def __lt__(self, other):
return self.priority < other.priority
# 1/min(1, threshold) is a hack
def nhat(self, f=lambda x: x):
#return f(self.item) *
return (1/min(1, self.threshold) + self.count)
def __str__(self):
return f"{self.item} {self.weight} {self.count}"
class TopKSampler(Sampler):
def __init__(self, topk, maxsize, seed=None):
self.topk = topk
self.maxsize = maxsize
self.buffer = [] #SortedList()
self.heavy_set = set()
self.item_dict = {}
self.threshold = float("inf")
self.processed = 0
self.seed = seed
self.rng = random.Random(seed)
def size(self):
return len(self.item_dict)
def nhat_infreq(self):
return 1. / self.threshold
def is_infreq(self, item, mingap=0):
y = self.item_dict[item]
if y.count == 0 or y.nhat() - mingap < self.nhat_infreq():
return True
return False
def getNumHeavy(self):
return sum([not self.is_infreq(x, 0.1*self.nhat_infreq()) for x in self.heavy_set])
def items(self):
"""
Generator which returns item, pseudo-inclusion probability
"""
for topk_item in self.buffer:
R = topk_item.priority
w = topk_item.weight
x = topk_item.item
yield x, self.inc_probability(self.getThreshold(x, w), w)
def getTotal(self):
return sum([x.nhat() for x in self.buffer])
def compact(self):
if self.getNumHeavy() <= self.topk and self.size() <= self.maxsize:
return
self.buffer.sort()
#print("compact", self.size(), self.getNumHeavy(), self.threshold, self.getTotal(), self.processed)
while self.size() > self.maxsize or self.getNumHeavy() > self.topk:
topk_item = self.buffer.pop()
del self.item_dict[topk_item.item]
if topk_item.item in self.heavy_set:
self.heavy_set.remove(topk_item.item)
self.threshold = min(self.threshold, topk_item.priority) # shouldn't need take min
def add(self, item, weight):
self.processed += 1
if item in self.item_dict:
topk_item = self.item_dict[item]
#print("initinc", topk_item.priority, topk_item.count, item, weight)
nhat = topk_item.nhat()
topk_item.priority *= nhat / (nhat+1)
topk_item.count += 1
#print("inc", topk_item.priority, topk_item.count, item, weight)
if not self.is_infreq(item):
self.heavy_set.add(item)
return
R = self.priority(item, weight)
#print("add", item, R)
if R < self.threshold:
entry = TopKItem(R, item, weight, self.threshold, 0)
self.buffer.append(entry)
self.item_dict[item] = entry
self.compact()
##################################################################################################################