hll/union.go (367 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package hll import ( "fmt" "github.com/apache/datasketches-go/internal" ) type Union interface { // Updatable UpdateUInt64(datum uint64) error UpdateInt64(datum int64) error UpdateSlice(datum []byte) error UpdateString(datum string) error Reset() error // Estimable GetCompositeEstimate() (float64, error) GetEstimate() (float64, error) //GetHipEstimate() (float64, error) GetLowerBound(numStdDev int) (float64, error) GetUpperBound(numStdDev int) (float64, error) IsEmpty() bool GetLgConfigK() int GetTgtHllType() TgtHllType GetCurMode() curMode GetUpdatableSerializationBytes() int ToCompactSlice() ([]byte, error) ToUpdatableSlice() ([]byte, error) UpdateSketch(sketch HllSketch) error GetResult(tgtHllType TgtHllType) (HllSketch, error) couponUpdate(coupon int) (hllSketchStateI, error) iterator() pairIterator } type unionImpl struct { lgMaxK int gadget HllSketch } func (u *unionImpl) iterator() pairIterator { return u.gadget.iterator() } //func (u *unionImpl) GetHipEstimate() (float64, error) { // return u.gadget.GetHipEstimate() //} func (u *unionImpl) GetUpperBound(numStdDev int) (float64, error) { return u.gadget.GetUpperBound(numStdDev) } func (u *unionImpl) GetLowerBound(numStdDev int) (float64, error) { return u.gadget.GetLowerBound(numStdDev) } func (u *unionImpl) couponUpdate(coupon int) (hllSketchStateI, error) { if coupon == empty { return u.gadget.(*hllSketchState).sketch, nil } sk, err := u.gadget.couponUpdate(coupon) u.gadget.(*hllSketchState).sketch = sk return sk, err } func (u *unionImpl) GetResult(tgtHllType TgtHllType) (HllSketch, error) { err := checkRebuildCurMinNumKxQ(u.gadget) if err != nil { return nil, err } return u.gadget.CopyAs(tgtHllType) } func NewUnionWithDefault() (Union, error) { return NewUnion(defaultLgK) } func NewUnion(lgMaxK int) (Union, error) { sk, err := NewHllSketch(lgMaxK, TgtHllTypeHll8) if err != nil { return nil, err } return &unionImpl{ lgMaxK: lgMaxK, gadget: sk, }, nil } func NewUnionFromSlice(byteArray []byte) (Union, error) { lgK, err := checkLgK(extractLgK(byteArray)) if err != nil { return nil, err } sk, e := NewHllSketchFromSlice(byteArray, false) if e != nil { return nil, e } union, err := NewUnion(lgK) if err != nil { return nil, err } err = union.UpdateSketch(sk) return union, err } func (u *unionImpl) GetCompositeEstimate() (float64, error) { return u.gadget.GetCompositeEstimate() } func (u *unionImpl) GetEstimate() (float64, error) { return u.gadget.GetEstimate() } func (u *unionImpl) UpdateUInt64(datum uint64) error { return u.gadget.UpdateUInt64(datum) } func (u *unionImpl) UpdateInt64(datum int64) error { return u.gadget.UpdateInt64(datum) } func (u *unionImpl) UpdateSlice(datum []byte) error { return u.gadget.UpdateSlice(datum) } func (u *unionImpl) UpdateString(datum string) error { return u.gadget.UpdateString(datum) } func (u *unionImpl) UpdateSketch(sketch HllSketch) error { un, err := u.unionImpl(sketch) if err != nil { return err } u.gadget.(*hllSketchState).sketch = un return nil } func (u *unionImpl) GetLgConfigK() int { return u.gadget.GetLgConfigK() } func (u *unionImpl) GetTgtHllType() TgtHllType { return u.gadget.GetTgtHllType() } func (u *unionImpl) GetCurMode() curMode { return u.gadget.GetCurMode() } func (u *unionImpl) IsEmpty() bool { return u.gadget.IsEmpty() } func (u *unionImpl) ToCompactSlice() ([]byte, error) { err := checkRebuildCurMinNumKxQ(u.gadget) if err != nil { return nil, err } return u.gadget.ToCompactSlice() } func (u *unionImpl) ToUpdatableSlice() ([]byte, error) { err := checkRebuildCurMinNumKxQ(u.gadget) if err != nil { return nil, err } return u.gadget.ToUpdatableSlice() } func (u *unionImpl) GetUpdatableSerializationBytes() int { return u.gadget.GetUpdatableSerializationBytes() } func (u *unionImpl) Reset() error { return u.gadget.Reset() } func (u *unionImpl) unionImpl(source HllSketch) (hllSketchStateI, error) { if u.gadget.GetTgtHllType() != TgtHllTypeHll8 { return nil, fmt.Errorf("gadget must be HLL_8") } if source == nil || source.IsEmpty() { return u.gadget.(*hllSketchState).sketch, nil } gadgetC := u.gadget.(*hllSketchState) sourceC := source.(*hllSketchState) srcMode := sourceC.sketch.GetCurMode() if srcMode == curModeList { err := sourceC.mergeTo(u.gadget) return u.gadget.(*hllSketchState).sketch, err } srcLgK := source.GetLgConfigK() gdgtLgK := u.gadget.GetLgConfigK() gdgtEmpty := u.gadget.IsEmpty() if srcMode == curModeSet { if gdgtEmpty && srcLgK == gdgtLgK { un, err := sourceC.CopyAs(TgtHllTypeHll8) gadgetC.sketch = un.(*hllSketchState).sketch return gadgetC.sketch, err } err := sourceC.mergeTo(u.gadget) return gadgetC.sketch, err } // Hereafter, the source is in HLL mode. var ( bits12 int bit3 int bit4 int ) if !gdgtEmpty { bits12 = int(gadgetC.GetCurMode()) << 1 } else { bits12 = 3 << 1 } if srcLgK < gdgtLgK { bit3 = 0 } if srcLgK > u.lgMaxK { bit4 = 16 } sw := bit4 | bit3 | bits12 switch sw { case 0, 8, 2, 10: // case 0: src <= max, src >= gdt, gdtLIST, gdtHeap // case 8: src <= max, src < gdt, gdtLIST, gdtHeap // case 2: src <= max, src >= gdt, gdtSET, gdtHeap // case 10: src <= max, src < gdt, gdtSET, gdtHeap { // Action: copy src, reverse merge w/autofold, ooof=src srcHll8, err := sourceC.CopyAs(TgtHllTypeHll8) if err != nil { return nil, err } err = gadgetC.mergeTo(srcHll8.(*hllSketchState)) return srcHll8.(*hllSketchState).sketch, err } case 16, 18: // case 16: src > max, src >= gdt, gdtList, gdtHeap // case 18: src > max, src >= gdt, gdtSet, gdtHeap { //Action: downsample src to MaxLgK, reverse merge w/autofold, ooof=src return nil, fmt.Errorf("not implemented cas 16,18") } case 4, 20: // case 4: src <= max, src >= gdt, gdtHLL, gdtHeap // case 20: src > max, src >= gdt, gdtHLL, gdtHeap { //Action: forward HLL merge w/autofold, ooof=True //merge src(Hll4,6,8,heap/mem,Mode=HLL) -> gdt(Hll8,heap,Mode=HLL) err := mergeHlltoHLLmode(source, u.gadget, srcLgK, gdgtLgK) if err != nil { return nil, err } u.gadget.(*hllSketchState).sketch.putOutOfOrder(true) return u.gadget.(*hllSketchState).sketch, nil } case 12: //src <= max, src < gdt, gdtHLL, gdtHeap { //Action: downsample gdt to srcLgK, forward HLL merge w/autofold, ooof=True return nil, fmt.Errorf("not implemented case 12") } case 6, 14: // case 6: src <= max, src >= gdt, gdtEmpty, gdtHeap // case 14: src <= max, src < gdt, gdtEmpty, gdtHeap { //Action: copy src, replace gdt, ooof=src srcHll8, err := sourceC.CopyAs(TgtHllTypeHll8) if err != nil { return nil, err } return srcHll8.(*hllSketchState).sketch, nil } case 22: //src > max, src >= gdt, gdtEmpty, gdtHeap { //Action: downsample src to lgMaxK, replace gdt, ooof=src return nil, fmt.Errorf("not implemented") } default: return nil, fmt.Errorf("impossible") } } func checkRebuildCurMinNumKxQ(sketch HllSketch) error { sketchImpl := sketch.(*hllSketchState).sketch curMode := sketch.GetCurMode() tgtHllType := sketch.GetTgtHllType() rebuild := sketchImpl.isRebuildCurMinNumKxQFlag() if !rebuild || curMode != curModeHll || tgtHllType != TgtHllTypeHll8 { return nil } sketchArrImpl := sketchImpl.(*hll8ArrayImpl) curMin := 64 numAtCurMin := 0 kxq0 := float64(uint64(1 << sketch.GetLgConfigK())) kxq1 := 0.0 itr := sketchArrImpl.iterator() for itr.nextAll() { v, err := itr.getValue() if err != nil { return err } if v > 0 { if v < 32 { inv, err := internal.InvPow2(v) if err != nil { return err } kxq0 += inv - 1.0 } else { inv, err := internal.InvPow2(v) if err != nil { return err } kxq1 += inv - 1.0 } } if v > curMin { continue } if v < curMin { curMin = v numAtCurMin = 1 } else { numAtCurMin++ } } sketchArrImpl.putKxQ0(kxq0) sketchArrImpl.putKxQ1(kxq1) sketchArrImpl.putCurMin(curMin) sketchArrImpl.putNumAtCurMin(numAtCurMin) sketchArrImpl.putRebuildCurMinNumKxQFlag(false) //HipAccum is not affected return nil } func mergeHlltoHLLmode(src HllSketch, tgt HllSketch, srcLgK int, tgtLgK int) error { sw := 0 if srcLgK > tgtLgK { sw |= 4 } if src.GetTgtHllType() != TgtHllTypeHll8 { sw |= 8 } srcK := 1 << srcLgK switch sw { case 0: //HLL_8, srcLgK=tgtLgK, src=heap, tgt=heap { srcArr := src.(*hllSketchState).sketch.(*hll8ArrayImpl).hllByteArr tgtArr := tgt.(*hllSketchState).sketch.(*hll8ArrayImpl).hllByteArr // Hack to trigger the bound check only once on tgtArr _ = tgtArr[len(srcArr)-1] for i, srcV := range srcArr { if srcV > tgtArr[i] { tgtArr[i] = srcV } } } case 8, 9: //!HLL_8, srcLgK=tgtLgK, src=heap, tgt=heap/mem { tgtAbsHllArr := tgt.(*hllSketchState).sketch.(*hll8ArrayImpl) if src.GetTgtHllType() == TgtHllTypeHll4 { src4 := src.(*hllSketchState).sketch.(*hll4ArrayImpl) auxHashMap := src4.auxHashMap curMin := src4.curMin i := 0 j := 0 for j < srcK { b := src4.hllByteArr[i] i++ value := uint(b) & loNibbleMask if value == auxToken { v, err := auxHashMap.mustFindValueFor(j) if err != nil { return err } tgtAbsHllArr.updateSlotNoKxQ(j, v) } else { tgtAbsHllArr.updateSlotNoKxQ(j, int(value)+curMin) } j++ value = uint(b) >> 4 if value == auxToken { v, err := auxHashMap.mustFindValueFor(j) if err != nil { return err } tgtAbsHllArr.updateSlotNoKxQ(j, v) } else { tgtAbsHllArr.updateSlotNoKxQ(j, int(value)+curMin) } j++ } } else { src6 := src.(*hllSketchState).sketch.(*hll6ArrayImpl) i := 0 j := 0 for j < srcK { b1 := src6.hllByteArr[i] b2 := src6.hllByteArr[i+1] b3 := src6.hllByteArr[i+2] i += 3 value := uint(b1) & 0x3f tgtAbsHllArr.updateSlotNoKxQ(j, int(value)) j++ value = uint(b1) >> 6 value |= (uint(b2) & 0x0f) << 2 tgtAbsHllArr.updateSlotNoKxQ(j, int(value)) j++ value = uint(b2) >> 4 value |= (uint(b3) & 3) << 4 tgtAbsHllArr.updateSlotNoKxQ(j, int(value)) j++ value = uint(b3) >> 2 tgtAbsHllArr.updateSlotNoKxQ(j, int(value)) j++ } } } // TODO continue implementing default: return fmt.Errorf("not implemented") } tgt.(*hllSketchState).sketch.putRebuildCurMinNumKxQFlag(true) return nil }