go/distinct_count_accuracy_profile.go (236 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package main import ( "fmt" "github.com/apache/datasketches-go/common" "github.com/apache/datasketches-go/hll" "github.com/apache/datasketches-go/kll" "math" "strings" "time" ) type DistinctCountAccuracyProfile struct { config distinctCountJobConfigType sketch hll.HllSketch stats []baseAccuracyStats startTime int64 } type accuracyStats struct { qsk *kll.ItemsSketch[float64] sumLB3 float64 sumLB2 float64 sumLB1 float64 sumUB3 float64 sumUB2 float64 sumUB1 float64 sumEst float64 sumRelErr float64 sumSqRelErr float64 trueValue uint64 } func NewDistinctCountAccuracyProfile(config distinctCountJobConfigType, tgtType hll.TgtHllType) *DistinctCountAccuracyProfile { sketch, _ := hll.NewHllSketch(config.lgK, tgtType) return &DistinctCountAccuracyProfile{ config: config, sketch: sketch, stats: buildLog2AccuracyStatsArray(config.lgMinU, config.lgMaxU, config.uppo, config.lgQK), startTime: time.Now().UnixMilli(), } } func newAccuracyStats(k int, trueValue uint64) *accuracyStats { qsk, _ := kll.NewKllItemsSketch[float64](uint16(k), 8, common.ArrayOfDoublesSerDe{}) return &accuracyStats{ qsk: qsk, trueValue: trueValue, } } func (d *DistinctCountAccuracyProfile) run() { minT := 1 << d.config.lgMinT maxT := 1 << d.config.lgMaxT maxU := 1 << d.config.lgMaxU vIn := uint64(0) // This will generate a table of data for each intermediate Trials point lastTpt := 0 for lastTpt < maxT { nextT := lastTpt if lastTpt == 0 { nextT = minT } else { nextT = int(pwr2SeriesNext(d.config.tppo, uint64(lastTpt))) } delta := nextT - lastTpt for i := 0; i < delta; i++ { vIn = d.runTrial(vIn) } lastTpt = nextT sb := &strings.Builder{} if nextT < maxT { if d.config.interData { sb.Reset() d.setHeader(sb) d.process(lastTpt, sb) fmt.Println(sb.String()) } } else { sb.Reset() d.setHeader(sb) d.process(lastTpt, sb) fmt.Println(sb.String()) } fmt.Printf("Config: : %+v\n", d.config) fmt.Printf("Cum Trials : %d\n", lastTpt) fmt.Printf("Cum Updates : %d\n", vIn) currentTime_mS := time.Now().UnixMilli() cumTime_mS := currentTime_mS - d.startTime fmt.Printf("Cum Time : %s\n", time.Duration(cumTime_mS*1000*1000)) timePerTrial_mS := float64(cumTime_mS) / float64(lastTpt) avgUpdateTime_ns := timePerTrial_mS * 1e6 / float64(maxU) fmt.Printf("Time Per Trial, mSec: %f\n", timePerTrial_mS) fmt.Printf("Avg Update Time, nSec: %f\n", avgUpdateTime_ns) fmt.Printf("Date Time : %s\n", time.Now().Format(time.RFC3339)) timeToComplete_mS := int64(timePerTrial_mS * float64(maxT-lastTpt)) fmt.Printf("Est Time to Complete: %s\n", time.Duration(timeToComplete_mS*1000*1000)) fmt.Printf("Est Time at Completion: %s\n", time.Now().Add(time.Duration(timeToComplete_mS*1000*1000)).Format(time.RFC3339)) fmt.Println("") } } func (d *DistinctCountAccuracyProfile) process(cumTrials int, sb *strings.Builder) { points := len(d.stats) for pt := 0; pt < points; pt++ { q := d.stats[pt].(*accuracyStats) trueUniques := q.trueValue meanEst := q.sumEst / float64(cumTrials) meanRelErr := q.sumRelErr / float64(cumTrials) meanSqErr := q.sumSqRelErr / float64(cumTrials) normMeanSqErr := meanSqErr / (float64(trueUniques) * float64(trueUniques)) rmsRelErr := math.Sqrt(normMeanSqErr) relLb3 := q.sumLB3/float64(cumTrials)/float64(trueUniques) - 1.0 relLb2 := q.sumLB2/float64(cumTrials)/float64(trueUniques) - 1.0 relLb1 := q.sumLB1/float64(cumTrials)/float64(trueUniques) - 1.0 relUb1 := q.sumUB1/float64(cumTrials)/float64(trueUniques) - 1.0 relUb2 := q.sumUB2/float64(cumTrials)/float64(trueUniques) - 1.0 relUb3 := q.sumUB3/float64(cumTrials)/float64(trueUniques) - 1.0 sb.WriteString(fmt.Sprintf("%d", trueUniques)) sb.WriteString("\t") sb.WriteString(fmt.Sprintf("%e", meanEst)) sb.WriteString("\t") sb.WriteString(fmt.Sprintf("%e", meanRelErr)) sb.WriteString("\t") sb.WriteString(fmt.Sprintf("%e", rmsRelErr)) sb.WriteString("\t") sb.WriteString(fmt.Sprintf("%d", cumTrials)) sb.WriteString("\t") // Quantiles quants, _ := q.qsk.GetQuantiles(GAUSSIANS_3SD, true) for i := 0; i < len(quants); i++ { sb.WriteString(fmt.Sprintf("%e", quants[i]/float64(trueUniques)-1.0)) sb.WriteString("\t") } // Bound averages sb.WriteString(fmt.Sprintf("%e", relLb3)) sb.WriteString("\t") sb.WriteString(fmt.Sprintf("%e", relLb2)) sb.WriteString("\t") sb.WriteString(fmt.Sprintf("%e", relLb1)) sb.WriteString("\t") sb.WriteString(fmt.Sprintf("%e", relUb1)) sb.WriteString("\t") sb.WriteString(fmt.Sprintf("%e", relUb2)) sb.WriteString("\t") sb.WriteString(fmt.Sprintf("%e", relUb3)) sb.WriteString("\n") } } func (d *DistinctCountAccuracyProfile) setHeader(sb *strings.Builder) string { sb.WriteString("TrueU") sb.WriteString("\t") sb.WriteString("MeanEst") sb.WriteString("\t") sb.WriteString("MeanRelErr") sb.WriteString("\t") sb.WriteString("RMS_RE") sb.WriteString("\t") sb.WriteString("Trials") sb.WriteString("\t") sb.WriteString("Min") sb.WriteString("\t") sb.WriteString("Q(.00135)") sb.WriteString("\t") sb.WriteString("Q(.02275)") sb.WriteString("\t") sb.WriteString("Q(.15866)") sb.WriteString("\t") sb.WriteString("Q(.5)") sb.WriteString("\t") sb.WriteString("Q(.84134)") sb.WriteString("\t") sb.WriteString("Q(.97725)") sb.WriteString("\t") sb.WriteString("Q(.99865)") sb.WriteString("\t") sb.WriteString("Max") sb.WriteString("\t") sb.WriteString("avgLB3") sb.WriteString("\t") sb.WriteString("avgLB2") sb.WriteString("\t") sb.WriteString("avgLB1") sb.WriteString("\t") sb.WriteString("avgUB1") sb.WriteString("\t") sb.WriteString("avgUB2") sb.WriteString("\t") sb.WriteString("avgUB3") sb.WriteString("\t") sb.WriteString("Max") return sb.String() } func (d *DistinctCountAccuracyProfile) runTrial(key uint64) uint64 { d.sketch.Reset() lastUniques := uint64(0) for _, ostat := range d.stats { stat := ostat.(*accuracyStats) delta := stat.trueValue - lastUniques for u := uint64(0); u < delta; u++ { key++ d.sketch.UpdateUInt64(key) } lastUniques += delta est, _ := d.sketch.GetEstimate() lb3, _ := d.sketch.GetLowerBound(3) lb2, _ := d.sketch.GetLowerBound(2) lb1, _ := d.sketch.GetLowerBound(1) ub1, _ := d.sketch.GetUpperBound(1) ub2, _ := d.sketch.GetUpperBound(2) ub3, _ := d.sketch.GetUpperBound(3) stat.update(est, lb3, lb2, lb1, ub1, ub2, ub3) } return key } func (a *accuracyStats) update( est float64, lb3 float64, lb2 float64, lb1 float64, ub1 float64, ub2 float64, ub3 float64, ) { a.qsk.Update(est) a.sumLB3 += lb3 a.sumLB2 += lb2 a.sumLB1 += lb1 a.sumUB1 += ub1 a.sumUB2 += ub2 a.sumUB3 += ub3 a.sumEst += est a.sumRelErr += est/float64(a.trueValue) - 1.0 erro := est - float64(a.trueValue) a.sumSqRelErr += erro * erro } func buildLog2AccuracyStatsArray(lgMin, lgMax, ppo, lgQK int) []baseAccuracyStats { qLen := countPoints(lgMin, lgMax, ppo) qArr := make([]baseAccuracyStats, qLen) p := uint64(1) << lgMin for i := 0; i < qLen; i++ { qArr[i] = newAccuracyStats(1<<lgQK, p) p = pwr2SeriesNext(ppo, p) } return qArr }