go/distinct_count_merge_speed_profile.go (156 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"fmt"
"github.com/apache/datasketches-go/hll"
"math"
"runtime"
"runtime/debug"
"strings"
"time"
)
type DistinctCountMergeSpeedProfile struct {
config distinctCountJobConfigType
union hll.Union
source hll.HllSketch
startTime int64
}
type mergeSpeedStats struct {
serializeTime_nS float64
deserializeTime_nS float64
mergeTime_nS float64
totalTime_nS float64
}
func NewDistinctCountMergeSpeedProfile(config distinctCountJobConfigType, tgtType hll.TgtHllType) *DistinctCountMergeSpeedProfile {
union, _ := hll.NewUnion(21)
return &DistinctCountMergeSpeedProfile{
config: config,
union: union,
startTime: time.Now().UnixMilli(),
}
}
func (d *DistinctCountMergeSpeedProfile) run() {
sb := &strings.Builder{}
d.setHeader(sb)
fmt.Println(sb.String())
stats := &mergeSpeedStats{}
vIn := int64(0)
debug.SetGCPercent(-1)
debug.SetMemoryLimit(math.MaxInt64)
for lgK := d.config.minLgK; lgK <= d.config.maxLgK; lgK++ {
runtime.GC()
var (
lgT = d.config.maxLgK - lgK + d.config.lgMinT
trials = 1 << lgT
sumSerializeTime_nS = 0.0
sumDeserializeTime_nS = 0.0
sumMergeTime_nS = 0.0
sumTotalTime_nS = 0.0
)
sb.Reset()
vIn = d.resetMerge(lgK, vIn)
for t := 0; t < trials; t++ {
d.runTrial(stats, lgK, d.config.lgDeltaU)
sumSerializeTime_nS += stats.serializeTime_nS
sumDeserializeTime_nS += stats.deserializeTime_nS
sumMergeTime_nS += stats.mergeTime_nS
sumTotalTime_nS += stats.totalTime_nS
}
stats.serializeTime_nS = sumSerializeTime_nS / float64(trials)
stats.deserializeTime_nS = sumDeserializeTime_nS / float64(trials)
stats.mergeTime_nS = sumMergeTime_nS / float64(trials)
stats.totalTime_nS = sumTotalTime_nS / float64(trials)
d.process(stats, lgK, lgT, sb)
fmt.Println(sb.String())
}
}
func (d *DistinctCountMergeSpeedProfile) setHeader(sb *strings.Builder) string {
sb.WriteString("LgK")
sb.WriteString("\t")
sb.WriteString("LgT")
sb.WriteString("\t")
sb.WriteString("Ser_nS")
sb.WriteString("\t")
sb.WriteString("DeSer_nS")
sb.WriteString("\t")
sb.WriteString("Merge_nS")
sb.WriteString("\t")
sb.WriteString("Total_nS")
sb.WriteString("\t")
sb.WriteString("PerSlot_nS")
return sb.String()
}
func (d *DistinctCountMergeSpeedProfile) runTrial(stats *mergeSpeedStats, lgK int, lgDeltaU int) {
var (
start = uint64(0)
serTime_nS = uint64(0)
deserTime_nS = uint64(0)
mergeTime_nS = uint64(0)
byteArr = []byte{}
err error
source hll.HllSketch
)
if d.config.serDe {
// Serialise
if d.config.compact {
start = uint64(time.Now().UnixNano())
byteArr, err = d.source.ToCompactSlice()
serTime_nS = uint64(time.Now().UnixNano()) - start
} else {
start = uint64(time.Now().UnixNano())
byteArr, err = d.source.ToUpdatableSlice()
serTime_nS = uint64(time.Now().UnixNano()) - start
}
if err != nil {
panic(err)
}
// Deserialise
start = uint64(time.Now().UnixNano())
source, err = hll.NewHllSketchFromSlice(byteArr, true)
deserTime_nS = uint64(time.Now().UnixNano()) - start
if err != nil {
panic(err)
}
// Merge
start = uint64(time.Now().UnixNano())
err = d.union.UpdateSketch(source)
mergeTime_nS += uint64(time.Now().UnixNano()) - start
if err != nil {
panic(err)
}
} else {
start = uint64(time.Now().UnixNano())
err = d.union.UpdateSketch(d.source)
mergeTime_nS = uint64(time.Now().UnixNano()) - start
if err != nil {
panic(err)
}
}
stats.serializeTime_nS = float64(serTime_nS)
stats.deserializeTime_nS = float64(deserTime_nS)
stats.mergeTime_nS = float64(mergeTime_nS)
stats.totalTime_nS = float64(mergeTime_nS)
}
func (d *DistinctCountMergeSpeedProfile) process(stats *mergeSpeedStats, lgK int, lgT int, sb *strings.Builder) string {
sb.WriteString(fmt.Sprintf("%d", lgK))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%d", lgT))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", stats.serializeTime_nS))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", stats.deserializeTime_nS))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", stats.mergeTime_nS))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", stats.totalTime_nS))
sb.WriteString("\t")
sb.WriteString(fmt.Sprintf("%e", stats.totalTime_nS/float64(uint64(1<<lgK))))
return sb.String()
}
func (d *DistinctCountMergeSpeedProfile) resetMerge(lgK int, vIn int64) int64 {
d.union, _ = hll.NewUnion(lgK)
d.source, _ = hll.NewHllSketch(lgK, hll.TgtHllTypeDefault)
U := 2 << lgK
for i := 0; i < U; i++ {
vIn++
d.union.UpdateInt64(vIn)
d.source.UpdateInt64(vIn)
}
return vIn
}