go/distinct_count_accuracy_profile.go (236 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"fmt"
"github.com/apache/datasketches-go/common"
"github.com/apache/datasketches-go/hll"
"github.com/apache/datasketches-go/kll"
"math"
"strings"
"time"
)
type DistinctCountAccuracyProfile struct {
config distinctCountJobConfigType
sketch hll.HllSketch
stats []baseAccuracyStats
startTime int64
}
type accuracyStats struct {
qsk *kll.ItemsSketch[float64]
sumLB3 float64
sumLB2 float64
sumLB1 float64
sumUB3 float64
sumUB2 float64
sumUB1 float64
sumEst float64
sumRelErr float64
sumSqRelErr float64
trueValue uint64
}
func NewDistinctCountAccuracyProfile(config distinctCountJobConfigType, tgtType hll.TgtHllType) *DistinctCountAccuracyProfile {
sketch, _ := hll.NewHllSketch(config.lgK, tgtType)
return &DistinctCountAccuracyProfile{
config: config,
sketch: sketch,
stats: buildLog2AccuracyStatsArray(config.lgMinU, config.lgMaxU, config.uppo, config.lgQK),
startTime: time.Now().UnixMilli(),
}
}
func newAccuracyStats(k int, trueValue uint64) *accuracyStats {
qsk, _ := kll.NewKllItemsSketch[float64](uint16(k), 8, common.ArrayOfDoublesSerDe{})
return &accuracyStats{
qsk: qsk,
trueValue: trueValue,
}
}
func (d *DistinctCountAccuracyProfile) run() {
minT := 1 << d.config.lgMinT
maxT := 1 << d.config.lgMaxT
maxU := 1 << d.config.lgMaxU
vIn := uint64(0)
// This will generate a table of data for each intermediate Trials point
lastTpt := 0
for lastTpt < maxT {
nextT := lastTpt
if lastTpt == 0 {
nextT = minT
} else {
nextT = int(pwr2SeriesNext(d.config.tppo, uint64(lastTpt)))
}
delta := nextT - lastTpt
for i := 0; i < delta; i++ {
vIn = d.runTrial(vIn)
}
lastTpt = nextT
sb := &strings.Builder{}
if nextT < maxT {
if d.config.interData {
sb.Reset()
d.setHeader(sb)
d.process(lastTpt, sb)
fmt.Println(sb.String())
}
} else {
sb.Reset()
d.setHeader(sb)
d.process(lastTpt, sb)
fmt.Println(sb.String())
}
fmt.Printf("Config: : %+v\n", d.config)
fmt.Printf("Cum Trials : %d\n", lastTpt)
fmt.Printf("Cum Updates : %d\n", vIn)
currentTime_mS := time.Now().UnixMilli()
cumTime_mS := currentTime_mS - d.startTime
fmt.Printf("Cum Time : %s\n", time.Duration(cumTime_mS*1000*1000))
timePerTrial_mS := float64(cumTime_mS) / float64(lastTpt)
avgUpdateTime_ns := timePerTrial_mS * 1e6 / float64(maxU)
fmt.Printf("Time Per Trial, mSec: %f\n", timePerTrial_mS)
fmt.Printf("Avg Update Time, nSec: %f\n", avgUpdateTime_ns)
fmt.Printf("Date Time : %s\n", time.Now().Format(time.RFC3339))
timeToComplete_mS := int64(timePerTrial_mS * float64(maxT-lastTpt))
fmt.Printf("Est Time to Complete: %s\n", time.Duration(timeToComplete_mS*1000*1000))
fmt.Printf("Est Time at Completion: %s\n", time.Now().Add(time.Duration(timeToComplete_mS*1000*1000)).Format(time.RFC3339))
fmt.Println("")
}
}
func (d *DistinctCountAccuracyProfile) process(cumTrials int, sb *strings.Builder) {
points := len(d.stats)
for pt := 0; pt < points; pt++ {
q := d.stats[pt].(*accuracyStats)
trueUniques := q.trueValue
meanEst := q.sumEst / float64(cumTrials)
meanRelErr := q.sumRelErr / float64(cumTrials)
meanSqErr := q.sumSqRelErr / float64(cumTrials)
normMeanSqErr := meanSqErr / (float64(trueUniques) * float64(trueUniques))
rmsRelErr := math.Sqrt(normMeanSqErr)
relLb3 := q.sumLB3/float64(cumTrials)/float64(trueUniques) - 1.0
relLb2 := q.sumLB2/float64(cumTrials)/float64(trueUniques) - 1.0
relLb1 := q.sumLB1/float64(cumTrials)/float64(trueUniques) - 1.0
relUb1 := q.sumUB1/float64(cumTrials)/float64(trueUniques) - 1.0
relUb2 := q.sumUB2/float64(cumTrials)/float64(trueUniques) - 1.0
relUb3 := q.sumUB3/float64(cumTrials)/float64(trueUniques) - 1.0
sb.WriteString(fmt.Sprintf("%d", trueUniques))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", meanEst))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", meanRelErr))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", rmsRelErr))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%d", cumTrials))
sb.WriteString("\t")
// Quantiles
quants, _ := q.qsk.GetQuantiles(GAUSSIANS_3SD, true)
for i := 0; i < len(quants); i++ {
sb.WriteString(fmt.Sprintf("%e", quants[i]/float64(trueUniques)-1.0))
sb.WriteString("\t")
}
// Bound averages
sb.WriteString(fmt.Sprintf("%e", relLb3))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", relLb2))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", relLb1))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", relUb1))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", relUb2))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", relUb3))
sb.WriteString("\n")
}
}
func (d *DistinctCountAccuracyProfile) setHeader(sb *strings.Builder) string {
sb.WriteString("TrueU")
sb.WriteString("\t")
sb.WriteString("MeanEst")
sb.WriteString("\t")
sb.WriteString("MeanRelErr")
sb.WriteString("\t")
sb.WriteString("RMS_RE")
sb.WriteString("\t")
sb.WriteString("Trials")
sb.WriteString("\t")
sb.WriteString("Min")
sb.WriteString("\t")
sb.WriteString("Q(.00135)")
sb.WriteString("\t")
sb.WriteString("Q(.02275)")
sb.WriteString("\t")
sb.WriteString("Q(.15866)")
sb.WriteString("\t")
sb.WriteString("Q(.5)")
sb.WriteString("\t")
sb.WriteString("Q(.84134)")
sb.WriteString("\t")
sb.WriteString("Q(.97725)")
sb.WriteString("\t")
sb.WriteString("Q(.99865)")
sb.WriteString("\t")
sb.WriteString("Max")
sb.WriteString("\t")
sb.WriteString("avgLB3")
sb.WriteString("\t")
sb.WriteString("avgLB2")
sb.WriteString("\t")
sb.WriteString("avgLB1")
sb.WriteString("\t")
sb.WriteString("avgUB1")
sb.WriteString("\t")
sb.WriteString("avgUB2")
sb.WriteString("\t")
sb.WriteString("avgUB3")
sb.WriteString("\t")
sb.WriteString("Max")
return sb.String()
}
func (d *DistinctCountAccuracyProfile) runTrial(key uint64) uint64 {
d.sketch.Reset()
lastUniques := uint64(0)
for _, ostat := range d.stats {
stat := ostat.(*accuracyStats)
delta := stat.trueValue - lastUniques
for u := uint64(0); u < delta; u++ {
key++
d.sketch.UpdateUInt64(key)
}
lastUniques += delta
est, _ := d.sketch.GetEstimate()
lb3, _ := d.sketch.GetLowerBound(3)
lb2, _ := d.sketch.GetLowerBound(2)
lb1, _ := d.sketch.GetLowerBound(1)
ub1, _ := d.sketch.GetUpperBound(1)
ub2, _ := d.sketch.GetUpperBound(2)
ub3, _ := d.sketch.GetUpperBound(3)
stat.update(est, lb3, lb2, lb1, ub1, ub2, ub3)
}
return key
}
func (a *accuracyStats) update(
est float64,
lb3 float64,
lb2 float64,
lb1 float64,
ub1 float64,
ub2 float64,
ub3 float64,
) {
a.qsk.Update(est)
a.sumLB3 += lb3
a.sumLB2 += lb2
a.sumLB1 += lb1
a.sumUB1 += ub1
a.sumUB2 += ub2
a.sumUB3 += ub3
a.sumEst += est
a.sumRelErr += est/float64(a.trueValue) - 1.0
erro := est - float64(a.trueValue)
a.sumSqRelErr += erro * erro
}
func buildLog2AccuracyStatsArray(lgMin, lgMax, ppo, lgQK int) []baseAccuracyStats {
qLen := countPoints(lgMin, lgMax, ppo)
qArr := make([]baseAccuracyStats, qLen)
p := uint64(1) << lgMin
for i := 0; i < qLen; i++ {
qArr[i] = newAccuracyStats(1<<lgQK, p)
p = pwr2SeriesNext(ppo, p)
}
return qArr
}