go/frequency_long_speed_profile.go (127 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package main import ( "fmt" "github.com/apache/datasketches-go/frequencies" "math" "runtime" "runtime/debug" "strings" "time" ) type FrequencyLongSpeedProfile struct { config frequencyJobConfigType zipf *zipfDistribution startTime int64 stats frequencySpeedStats inputValues []int64 } type frequencySpeedStats struct { buildTimeNs int64 updateTimeNs int64 serializeTimeNs int64 deserializeTimeNs int64 numRetainedItems int64 serializedSizeBytes int64 } func NewFrequencyLongSpeedProfile(config frequencyJobConfigType) *FrequencyLongSpeedProfile { return &FrequencyLongSpeedProfile{ config: config, zipf: newZipfDistribution(int64(config.zipfRange), config.zipfExponent), startTime: time.Now().UnixMilli(), stats: frequencySpeedStats{}, } } func (d *FrequencyLongSpeedProfile) run() { sb := &strings.Builder{} debug.SetGCPercent(-1) debug.SetMemoryLimit(math.MaxInt64) lgMinStreamLen := d.config.lgMin lgMaxStreamLen := d.config.lgMax pointsPerOctave := d.config.PPO lgMinTrials := d.config.lgMinTrials lgMaxTrials := d.config.lgMaxTrials minStreamLen := 1 << lgMinStreamLen maxStreamLen := 1 << lgMaxStreamLen d.setHeader(sb) fmt.Println(sb.String()) streamLength := minStreamLen for streamLength <= maxStreamLen { sb.Reset() numTrials := getNumTrials(streamLength, lgMinStreamLen, lgMaxStreamLen, lgMinTrials, lgMaxTrials) d.resetStats() for i := 0; i < numTrials; i++ { d.prepareTrial(streamLength) d.process() } runtime.GC() d.getStats(streamLength, numTrials, sb) fmt.Println(sb.String()) streamLength = int(pwr2SeriesNext(pointsPerOctave, uint64(streamLength))) } } func (d *FrequencyLongSpeedProfile) process() { startBuild := time.Now().UnixNano() sketch, err := frequencies.NewLongsSketchWithMaxMapSize(d.config.k) if err != nil { panic(err) } stopBuild := time.Now().UnixNano() d.stats.buildTimeNs += stopBuild - startBuild startUpdate := time.Now().UnixNano() for i := 0; i < len(d.inputValues); i++ { err := sketch.Update(d.inputValues[i]) if err != nil { panic(err) } } stopUpdate := time.Now().UnixNano() d.stats.updateTimeNs += stopUpdate - startUpdate startSerialize := time.Now().UnixNano() bytes := sketch.ToSlice() stopSerialize := time.Now().UnixNano() d.stats.serializeTimeNs += stopSerialize - startSerialize startDeserialize := time.Now().UnixNano() _, err = frequencies.NewLongsSketchFromSlice(bytes) if err != nil { panic(err) } stopDeserialize := time.Now().UnixNano() d.stats.deserializeTimeNs += stopDeserialize - startDeserialize d.stats.numRetainedItems += int64(sketch.GetNumActiveItems()) d.stats.serializedSizeBytes += int64(len(bytes)) } func (d *FrequencyLongSpeedProfile) setHeader(sb *strings.Builder) { sb.WriteString("Stream\tTrials\tBuild\tUpdate\tSer\tDeser\tItems\tstatsSize") } func (d *FrequencyLongSpeedProfile) getStats(streamLength int, numTrials int, sb *strings.Builder) { sb.WriteString(fmt.Sprintf("%d\t%d\t%.1f\t%.1f\t%.1f\t%.1f\t%.1f\t%.1f", streamLength, numTrials, float64(d.stats.buildTimeNs)/float64(numTrials), float64(d.stats.updateTimeNs)/float64(numTrials)/float64(streamLength), float64(d.stats.serializeTimeNs)/float64(numTrials), float64(d.stats.deserializeTimeNs)/float64(numTrials), float64(d.stats.numRetainedItems)/float64(numTrials), float64(d.stats.serializedSizeBytes)/float64(numTrials), )) } func (d *FrequencyLongSpeedProfile) resetStats() { d.stats.buildTimeNs = 0 d.stats.updateTimeNs = 0 d.stats.serializeTimeNs = 0 d.stats.deserializeTimeNs = 0 d.stats.numRetainedItems = 0 d.stats.serializedSizeBytes = 0 } func (d *FrequencyLongSpeedProfile) prepareTrial(streamLength int) { // prepare input data d.inputValues = make([]int64, streamLength) for i := 0; i < streamLength; i++ { d.inputValues[i] = d.zipf.sample() } } func getNumTrials(x, lgMinX, lgMaxX, lgMinTrials, lgMaxTrials int) int { slope := float64(lgMaxTrials-lgMinTrials) / float64(lgMinX-lgMaxX) lgX := math.Log(float64(x)) / math.Log(2.0) lgTrials := slope*lgX + float64(lgMaxTrials) return int(math.Pow(2.0, lgTrials)) }