cpc/compression_characterization.go (298 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cpc import ( "fmt" "io" "math" "math/bits" "time" "github.com/apache/datasketches-go/common" "github.com/apache/datasketches-go/internal" ) // CompressionCharacterization is a harness that tests compression performance // for various log(K) and N, measuring the time spent in creation, updates, compression, // serialization, deserialization, uncompression, and equality checks. type CompressionCharacterization struct { // Inputs lgMinK int lgMaxK int lgMinT int lgMaxT int lgMulK int uPPO int incLgK int printStr io.Writer printWtr io.Writer // Internal formatting for the table hfmt string dfmt string hStrArr []string // A running counter for updates vIn uint64 } func NewCompressionCharacterization( lgMinK, lgMaxK, lgMinT, lgMaxT, lgMulK, uPPO, incLgK int, pS, pW io.Writer, ) *CompressionCharacterization { // Ensure some parameters are at least 1 if uPPO < 1 { uPPO = 1 } if incLgK < 1 { incLgK = 1 } cc := &CompressionCharacterization{ lgMinK: lgMinK, lgMaxK: lgMaxK, lgMinT: lgMinT, lgMaxT: lgMaxT, lgMulK: lgMulK, uPPO: uPPO, incLgK: incLgK, printStr: pS, printWtr: pW, } cc.assembleFormats() return cc } // Start runs the entire characterization, printing the header first. func (cc *CompressionCharacterization) Start() error { cc.printf(cc.hfmt, cc.toInterfaceSlice(cc.hStrArr)...) return cc.doRangeOfLgK() } // doRangeOfLgK loops from lgMinK to lgMaxK in steps of incLgK func (cc *CompressionCharacterization) doRangeOfLgK() error { for lgK := cc.lgMinK; lgK <= cc.lgMaxK; lgK += cc.incLgK { err := cc.doRangeOfNAtLgK(lgK) if err != nil { return err } } return nil } // doRangeOfNAtLgK iterates over n up to 2^(lgK + lgMulK) func (cc *CompressionCharacterization) doRangeOfNAtLgK(lgK int) error { var n int64 = 1 lgMaxN := lgK + cc.lgMulK maxN := int64(1 << lgMaxN) // The slope for computing total trials: // slope = -(lgMaxT - lgMinT) / (lgMaxN) slope := float64(-(cc.lgMaxT - cc.lgMinT)) / float64(lgMaxN) for n <= maxN { // totalTrials = 2^( (slope * log2(n)) + lgMaxT ), clamped to at least 2^lgMinT // we do a partial linear interpolation in log space lgT := (slope * math.Log2(float64(n))) + float64(cc.lgMaxT) // Round up to the next power of two, but also at least 2^lgMinT totTrials := common.CeilingPowerOf2(int(math.Round(math.Pow(2.0, lgT)))) minTrials := 1 << cc.lgMinT if totTrials < minTrials { totTrials = minTrials } err := cc.doTrialsAtLgKAtN(lgK, n, totTrials) if err != nil { return err } // step n using powerSeriesNextDouble with base 2.0 newN := common.PowerSeriesNextDouble(cc.uPPO, float64(n), true, 2.0) n = int64(math.Round(newN)) } return nil } // doTrialsAtLgKAtN runs the wave-based test for a given (lgK, n, totalTrials) func (cc *CompressionCharacterization) doTrialsAtLgKAtN(lgK int, n int64, totalTrials int) error { k := 1 << lgK minNK := k if int64(k) > n { minNK = int(n) } nOverK := float64(n) / float64(k) lgTotTrials := bits.TrailingZeros32(uint32(totalTrials)) // We'll define waves = 2^(lgWaves). Each wave has trialsPerWave = 2^(lgTotTrials - lgWaves). lgWaves := lgTotTrials - 10 if lgWaves < 0 { lgWaves = 0 } trialsPerWave := 1 << (lgTotTrials - lgWaves) wavesCount := 1 << lgWaves streamSketches := make([]*CpcSketch, trialsPerWave) compressedStates1 := make([]*CpcCompressedState, trialsPerWave) memoryArr := make([][]byte, trialsPerWave) compressedStates2 := make([]*CpcCompressedState, trialsPerWave) unCompressedSketches := make([]*CpcSketch, trialsPerWave) var totalC, totalW int64 var sumCtorNS, sumUpdNS, sumComNS, sumSerNS, sumDesNS, sumUncNS, sumEquNS int64 startTime := time.Now() // wave loop for w := 0; w < wavesCount; w++ { // Construct sketches nanoStart := time.Now().UnixNano() for t := 0; t < trialsPerWave; t++ { sketch, err := NewCpcSketch(lgK, internal.DEFAULT_UPDATE_SEED) if err != nil { return err } streamSketches[t] = sketch } nanoEnd := time.Now().UnixNano() sumCtorNS += nanoEnd - nanoStart nanoStart = nanoEnd // Update each sketch for t := 0; t < trialsPerWave; t++ { sketch := streamSketches[t] for i := int64(0); i < n; i++ { cc.vIn += common.InverseGoldenU64 _ = sketch.UpdateUint64(cc.vIn) } } nanoEnd = time.Now().UnixNano() sumUpdNS += nanoEnd - nanoStart nanoStart = nanoEnd // Compress each sketch for t := 0; t < trialsPerWave; t++ { sketch := streamSketches[t] state, err := NewCpcCompressedStateFromSketch(sketch) if err != nil { panic(fmt.Sprintf("Compression error: %v", err)) } compressedStates1[t] = state totalC += int64(sketch.numCoupons) // approximate measure of total words in CSV + CW totalW += int64(state.CsvLengthInts + state.CwLengthInts) } nanoEnd = time.Now().UnixNano() sumComNS += nanoEnd - nanoStart nanoStart = nanoEnd // Convert each CompressedState to a byte slice for t := 0; t < trialsPerWave; t++ { state := compressedStates1[t] mem, err := state.exportToMemory() if err != nil { panic(fmt.Sprintf("exportToMemory error: %v", err)) } memoryArr[t] = mem } nanoEnd = time.Now().UnixNano() sumSerNS += nanoEnd - nanoStart nanoStart = nanoEnd // Import from memory to new CompressedState for t := 0; t < trialsPerWave; t++ { mem := memoryArr[t] state, err := importFromMemory(mem) if err != nil { panic(fmt.Sprintf("importFromMemory error: %v", err)) } compressedStates2[t] = state } nanoEnd = time.Now().UnixNano() sumDesNS += nanoEnd - nanoStart nanoStart = nanoEnd // Uncompress into a new CpcSketch for t := 0; t < trialsPerWave; t++ { state := compressedStates2[t] uncSk, err := uncompressSketch(state, internal.DEFAULT_UPDATE_SEED) if err != nil { return err } unCompressedSketches[t] = uncSk } nanoEnd = time.Now().UnixNano() sumUncNS += nanoEnd - nanoStart nanoStart = nanoEnd // Equality check for t := 0; t < trialsPerWave; t++ { s1 := streamSketches[t] s2 := unCompressedSketches[t] if !specialEquals(s1, s2, false, false) { return fmt.Errorf("uncompressed sketch not equal to original") } } nanoEnd = time.Now().UnixNano() sumEquNS += nanoEnd - nanoStart nanoStart = nanoEnd } totalSeconds := time.Since(startTime).Seconds() avgC := float64(totalC) / float64(totalTrials) avgCoK := avgC / float64(k) avgWords := float64(totalW) / float64(totalTrials) avgBytes := 4.0 * avgWords // 4 bytes per int // compute average times // Each sum is total for all waves, so we divide by totalTrials avgCtor := float64(sumCtorNS) / float64(totalTrials) avgUpd := float64(sumUpdNS) / float64(totalTrials) avgCom := float64(sumComNS) / float64(totalTrials) avgSer := float64(sumSerNS) / float64(totalTrials) avgDes := float64(sumDesNS) / float64(totalTrials) avgUnc := float64(sumUncNS) / float64(totalTrials) avgEqu := float64(sumEquNS) / float64(totalTrials) avgUpdPerN := avgUpd / float64(n) avgComPer2C := avgCom / (2.0 * avgC) avgComPerK := avgCom / float64(k) avgSerPerW := avgSer / avgWords avgDesPerW := avgDes / avgWords avgUncPer2C := avgUnc / (2.0 * avgC) avgUncPerK := avgUnc / float64(k) avgEquPerMinNK := avgEqu / float64(minNK) // final flavor/offset from last wave lastSketch := unCompressedSketches[len(unCompressedSketches)-1] finFlavor := lastSketch.getFlavor() finOff := lastSketch.windowOffset flavorOff := fmt.Sprintf("%s%2d", finFlavor.String(), finOff) // Print final line cc.printf( cc.dfmt, lgK, totalTrials, n, minNK, avgCoK, flavorOff, nOverK, avgBytes, avgCtor, avgUpd, avgCom, avgSer, avgDes, avgUnc, avgEqu, avgUpdPerN, avgComPer2C, avgComPerK, avgSerPerW, avgDesPerW, avgUncPer2C, avgUncPerK, avgEquPerMinNK, totalSeconds, ) return nil } // assembleFormats sets up the column headers & format strings for the final output. func (cc *CompressionCharacterization) assembleFormats() { columns := []struct { name string headerFmt string dataFmt string }{ {"lgK", "%3s", "%3d"}, {"Trials", "%9s", "%9d"}, {"n", "%12s", "%12d"}, {"MinKN", "%9s", "%9d"}, {"AvgC/K", "%9s", "%9.4g"}, {"FinFlavor", "%11s", "%11s"}, {"N/K", "%9s", "%9.4g"}, {"AvgBytes", "%9s", "%9.0f"}, {"AvgCtor_nS", "%11s", "%11.0f"}, {"AvgUpd_nS", "%10s", "%10.4e"}, {"AvgCom_nS", "%10s", "%10.0f"}, {"AvgSer_nS", "%10s", "%10.2f"}, {"AvgDes_nS", "%10s", "%10.2f"}, {"AvgUnc_nS", "%10s", "%10.0f"}, {"AvgEqu_nS", "%10s", "%10.0f"}, {"AvgUpd_nSperN", "%14s", "%14.2f"}, {"AvgCom_nSper2C", "%15s", "%15.4g"}, {"AvgCom_nSperK", "%14s", "%14.4g"}, {"AvgSer_nSperW", "%14s", "%14.2f"}, {"AvgDes_nSperW", "%14s", "%14.2f"}, {"AvgUnc_nSper2C", "%15s", "%15.4g"}, {"AvgUnc_nSperK", "%14s", "%14.4g"}, {"AvgEqu_nSperMinNK", "%18s", "%18.4g"}, {"Total_S", "%8s", "%8.3f"}, } cc.hStrArr = make([]string, len(columns)) headerLine := "\nCompression Characterization\n" dataLine := "" for i, col := range columns { cc.hStrArr[i] = col.name sep := "\t" if i == len(columns)-1 { sep = "\n" } headerLine += fmt.Sprintf(col.headerFmt, col.name) + sep dataLine += col.dataFmt + sep } cc.hfmt = headerLine cc.dfmt = dataLine } // printf writes to both outputs if they exist func (cc *CompressionCharacterization) printf(format string, args ...interface{}) { if cc.printStr != nil { fmt.Fprintf(cc.printStr, format, args...) } if cc.printWtr != nil { fmt.Fprintf(cc.printWtr, format, args...) } } // toInterfaceSlice helps pass a slice of strings to fmt.Fprintf for the header. func (cc *CompressionCharacterization) toInterfaceSlice(ss []string) []interface{} { out := make([]interface{}, len(ss)) for i := range ss { out[i] = ss[i] } return out }