cpc/streaming_validation.go (155 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cpc import ( "fmt" "io" "math" "github.com/apache/datasketches-go/common" "github.com/apache/datasketches-go/internal" ) // StreamingValidation is a test/characterization harness that repeatedly // updates a CPC sketch and a BitMatrix, checks their correctness, and logs results. type StreamingValidation struct { // Config inputs lgMinK int lgMaxK int trials int ppoN int printStream io.Writer printWriter io.Writer // Internal formatting for table columns hfmt string dfmt string hStrArr []string // Internal state vIn uint64 // increments each update sketch *CpcSketch matrix *BitMatrix } func NewStreamingValidation( lgMinK, lgMaxK, trials, ppoN int, pS, pW io.Writer, ) *StreamingValidation { sv := &StreamingValidation{ lgMinK: lgMinK, lgMaxK: lgMaxK, trials: trials, ppoN: ppoN, printStream: pS, printWriter: pW, } sv.assembleStrings() return sv } // Start begins the streaming validation process, printing column headers and running the test loops. func (sv *StreamingValidation) Start() { sv.printf(sv.hfmt, sv.stringArrayToInterface(sv.hStrArr)...) sv.doRangeOfLgK() } // doRangeOfLgK loops from lgMinK to lgMaxK inclusive. func (sv *StreamingValidation) doRangeOfLgK() { for lgK := sv.lgMinK; lgK <= sv.lgMaxK; lgK++ { sv.doRangeOfNAtLgK(lgK) } } // doRangeOfNAtLgK loops over n from 1 up to 64 * (1 << lgK), // stepping in a power-series style (ppoN increments). func (sv *StreamingValidation) doRangeOfNAtLgK(lgK int) { var n int64 = 1 maxN := int64(64) * (1 << lgK) for n < maxN { sv.doTrialsAtLgKAtN(lgK, n) // Use powerSeriesNextDouble to pick the next n. n = int64(math.Round(common.PowerSeriesNextDouble(sv.ppoN, float64(n), true, 2.0))) } } // doTrialsAtLgKAtN performs the configured number of trials at a given lgK and n. func (sv *StreamingValidation) doTrialsAtLgKAtN(lgK int, n int64) { var sumC, sumIconEst, sumHipEst float64 // We'll create sketches once outside the loop, but we reset them each trial. // Also create a BitMatrix to compare the set bits. sketch, _ := NewCpcSketch(lgK, internal.DEFAULT_UPDATE_SEED) matrix := NewBitMatrixWithSeed(lgK, internal.DEFAULT_UPDATE_SEED) for t := 0; t < sv.trials; t++ { sketch.reset() matrix.Reset() for i := int64(0); i < n; i++ { sv.vIn += common.InverseGoldenU64 in := sv.vIn // Update the CPC sketch _ = sketch.UpdateUint64(in) // Update the BitMatrix matrix.Update(int64(in)) } // Accumulate sums sumC += float64(sketch.numCoupons) sumIconEst += iconEstimate(lgK, sketch.numCoupons) sumHipEst += sketch.hipEstAccum // Check that the number of coupons matches the matrix if matrix.GetNumCoupons() != sketch.numCoupons { panic(fmt.Sprintf("Mismatch in numCoupons: bitMatrix=%d, cpcSketch=%d", matrix.GetNumCoupons(), sketch.numCoupons)) } // Check that the actual bit matrix matches bitMat, err := sketch.bitMatrixOfSketch() if err != nil { panic(fmt.Sprintf("bitMatrixOfSketch error: %v", err)) } mat2 := matrix.GetMatrix() // Compare row by row if len(bitMat) != len(mat2) { panic(fmt.Sprintf("Mismatch: bitMatrixOfSketch len=%d, matrix.GetMatrix len=%d", len(bitMat), len(mat2))) } for i := range bitMat { if bitMat[i] != mat2[i] { panic(fmt.Sprintf("Mismatch at row %d: bitMat=%x, mat2=%x", i, bitMat[i], mat2[i])) } } } // final state from the last trial finC := sketch.numCoupons finFlavor := sketch.getFlavor() finOff := sketch.windowOffset avgC := sumC / float64(sv.trials) avgIconEst := sumIconEst / float64(sv.trials) avgHipEst := sumHipEst / float64(sv.trials) // Print the row sv.printf( sv.dfmt, lgK, sv.trials, n, finC, finFlavor.String(), finOff, avgC, avgIconEst, avgHipEst, ) } func (sv *StreamingValidation) assembleStrings() { // columns: name, headerFormat, dataFormat columns := []struct { name string headerFmt string dataFmt string }{ {"lgK", "%3s", "%3d"}, {"Trials", "%7s", "%7d"}, {"n", "%8s", "%8d"}, {"FinC", "%8s", "%8d"}, {"FinFlavor", "%10s", "%10s"}, {"FinOff", "%7s", "%7d"}, {"AvgC", "%12s", "%12.3f"}, {"AvgICON", "%12s", "%12.3f"}, {"AvgHIP", "%12s", "%12.3f"}, } sv.hStrArr = make([]string, len(columns)) // Build a single line for the header format, and one for the data format. headerLine := "\nStreaming Validation\n" dataLine := "" for i, col := range columns { sv.hStrArr[i] = col.name // Add a tab or line break sep := "\t" if i == len(columns)-1 { sep = "\n" } headerLine += fmt.Sprintf(col.headerFmt, col.name) + sep dataLine += col.dataFmt dataLine += sep } sv.hfmt = headerLine sv.dfmt = dataLine } // printf writes to both printStream and printWriter if non-nil. func (sv *StreamingValidation) printf(format string, args ...interface{}) { if sv.printStream != nil { fmt.Fprintf(sv.printStream, format, args...) } if sv.printWriter != nil { fmt.Fprintf(sv.printWriter, format, args...) } } // stringArrayToInterface helps pass a []string to a varargs function (like Fprintf). func (sv *StreamingValidation) stringArrayToInterface(ss []string) []interface{} { ii := make([]interface{}, len(ss)) for i := range ss { ii[i] = ss[i] } return ii }