go/distinct_count_serde_profile.go (168 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"fmt"
"github.com/apache/datasketches-go/hll"
"math"
"runtime/debug"
"strings"
"time"
)
type DistinctCountSerDeProfile struct {
config distinctCountJobConfigType
sketch hll.HllSketch
startTime int64
}
type distinctCountSerdeStats struct {
serializeTime_nS int64
deserializeTime_nS int64
estimationTime_nS int64
size_bytes uint64
}
func NewDistinctCountSerDeProfile(config distinctCountJobConfigType, tgtType hll.TgtHllType) *DistinctCountSerDeProfile {
sketch, _ := hll.NewHllSketch(config.lgK, tgtType)
return &DistinctCountSerDeProfile{
config: config,
sketch: sketch,
startTime: time.Now().UnixMilli(),
}
}
func (d *DistinctCountSerDeProfile) run() {
var (
sb = &strings.Builder{}
vIn = int64(0)
rawStats = &distinctCountSerdeStats{}
meanStats = &distinctCountSerdeStats{}
maxU = 1 << d.config.lgMaxU
minU = 1 << d.config.lgMinU
lastU = 0
)
debug.SetGCPercent(-1)
debug.SetMemoryLimit(math.MaxInt64)
d.setHeader(sb)
fmt.Println(sb.String())
sb.Reset()
for lastU < maxU {
nextU := minU
if lastU != 0 {
nextU = int(pwr2SeriesNext(d.config.uppo, uint64(lastU)))
}
lastU = nextU
sumStats := &distinctCountSerdeStats{}
trials := d.getNumTrials(nextU)
for t := 0; t < trials; t++ {
vIn = d.runTrial(rawStats, vIn, nextU)
sumStats.add(rawStats)
}
meanStats.makeMeanOf(sumStats, trials)
d.process(meanStats, trials, nextU, sb)
fmt.Println(sb.String())
sb.Reset()
}
}
func (d *DistinctCountSerDeProfile) setHeader(sb *strings.Builder) string {
sb.WriteString("TrueU")
sb.WriteString("\t")
sb.WriteString("Trials")
sb.WriteString("\t")
sb.WriteString("Ser_nS")
sb.WriteString("\t")
sb.WriteString("DeSer_nS")
sb.WriteString("\t")
sb.WriteString("Est_nS")
sb.WriteString("\t")
sb.WriteString("Size_B")
return sb.String()
}
func (d *DistinctCountSerDeProfile) runTrial(stats *distinctCountSerdeStats, key int64, lgDeltaU int) int64 {
var (
startEstimationTime_Ns int64
stopEstimationTime_Ns int64
startSerTime_Ns int64
stopSerTime_Ns int64
startDeserTime_Ns int64
stopDeserTime_Ns int64
sketchBytes []byte
est1 float64
est2 float64
)
d.sketch.Reset()
for u := lgDeltaU; u > 0; u-- {
key++
d.sketch.UpdateInt64(key)
}
startEstimationTime_Ns = time.Now().UnixNano()
est1, err := d.sketch.GetEstimate()
if err != nil {
panic(err)
}
stopEstimationTime_Ns = time.Now().UnixNano()
if d.config.compact {
startSerTime_Ns = time.Now().UnixNano()
sketchBytes, err = d.sketch.ToCompactSlice()
stopSerTime_Ns = time.Now().UnixNano()
} else {
startSerTime_Ns = time.Now().UnixNano()
sketchBytes, err = d.sketch.ToUpdatableSlice()
stopSerTime_Ns = time.Now().UnixNano()
}
startDeserTime_Ns = time.Now().UnixNano()
sketchRebuild, err := hll.NewHllSketchFromSlice(sketchBytes, true)
stopDeserTime_Ns = time.Now().UnixNano()
if err != nil {
panic(err)
}
est2, err = sketchRebuild.GetEstimate()
if err != nil {
panic(err)
}
if est1 != est2 {
panic("Estimation mismatch")
}
stats.serializeTime_nS = stopSerTime_Ns - startSerTime_Ns
stats.deserializeTime_nS = stopDeserTime_Ns - startDeserTime_Ns
stats.estimationTime_nS = stopEstimationTime_Ns - startEstimationTime_Ns
stats.size_bytes = uint64(len(sketchBytes))
return key
}
func (d *DistinctCountSerDeProfile) process(stats *distinctCountSerdeStats, trials int, uPerTrial int, sb *strings.Builder) string {
sb.WriteString(fmt.Sprintf("%d", uPerTrial))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%d", trials))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%d", stats.serializeTime_nS))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%d", stats.deserializeTime_nS))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%d", stats.estimationTime_nS))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%d", stats.size_bytes))
return sb.String()
}
// getNumTrials computes the number of trials for a given current number of uniques for a
// trial set. This is used in speed trials and decreases the number of trials
// as the number of uniques increase.
func (d *DistinctCountSerDeProfile) getNumTrials(curU int) int {
minBpU := 1 << d.config.lgMinBpU
maxBpU := 1 << d.config.lgMaxBpU
maxT := 1 << d.config.lgMaxT
minT := 1 << d.config.lgMinT
if d.config.lgMinT == d.config.lgMaxT || curU <= minBpU {
return maxT
}
if curU >= maxBpU {
return minT
}
lgCurU := math.Log2(float64(curU))
slope := float64(d.config.lgMaxT-d.config.lgMinT) / float64(d.config.lgMinBpU-d.config.lgMaxBpU)
lgTrials := slope*(lgCurU-float64(d.config.lgMinBpU)) + float64(d.config.lgMaxT)
return int(math.Pow(2.0, lgTrials))
}
func (s *distinctCountSerdeStats) add(o *distinctCountSerdeStats) {
s.serializeTime_nS += o.serializeTime_nS
s.deserializeTime_nS += o.deserializeTime_nS
s.estimationTime_nS += o.estimationTime_nS
s.size_bytes += o.size_bytes
}
func (s *distinctCountSerdeStats) makeMeanOf(o *distinctCountSerdeStats, count int) {
s.serializeTime_nS = int64(float64(o.serializeTime_nS) / float64(count))
s.deserializeTime_nS = int64(float64(o.deserializeTime_nS) / float64(count))
s.estimationTime_nS = int64(float64(o.estimationTime_nS) / float64(count))
s.size_bytes = o.size_bytes / uint64(count)
}