query/common/hll.go (813 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package common import ( "bytes" "github.com/uber/aresdb/utils" "strings" "github.com/pkg/errors" memCom "github.com/uber/aresdb/memstore/common" "io" "math" "sort" "unsafe" ) const ( // OldHLLDataHeader is the old magic header for migration OldHLLDataHeader uint32 = 0xACED0101 // HLLDataHeader is the magic header written into serialized format of hyperloglog query result. HLLDataHeader uint32 = 0xACED0102 // EnumDelimiter is the delimiter to delimit enum cases. EnumDelimiter = "\u0000\n" // DenseDataLength is the length of hll dense data in bytes. DenseDataLength = 1 << 14 // 16kb // DenseThreshold is the thresold to convert sparse value to dense value. DenseThreshold = DenseDataLength / 4 ) // HLLData stores fields for serialize and deserialize an hyperloglog query result when client sets Content-Accept // header to be application/hll. // The serialized buffer of a hll data is in following format: // [uint32] magic_number [uint32] padding // // -----------query result 0------------------- // <header> // [uint32] query result 0 size [uint8] error or result [3 bytes padding] // [uint8] num_enum_columns [uint8] bytes per dim ... [padding for 8 bytes] // [uint32] result_size [uint32] raw_dim_values_vector_length // [uint8] dim_index_0... [uint8] dim_index_n [padding for 8 bytes] // [uint32] data_type_0...[uint32] data_type_n [padding for 8 bytes] // // <enum cases 0> // [uint32_t] number of bytes of enum cases [uint16] dim_index [2 bytes: padding] // <enum values 0> delimited by "\u0000\n" [padding for 8 bytes] // <end of header> // <raw dim values vector> // ... // [padding for 8 byte alignment] // // <raw hll dense vector> // ... // ------------error 1---------- // [uint32] query result 1 size [uint8] error or result [3 bytes padding] // ... type HLLData struct { NumDimsPerDimWidth DimCountsPerDimWidth ResultSize uint32 PaddedRawDimValuesVectorLength uint32 PaddedHLLVectorLength int64 DimIndexes []int DataTypes []memCom.DataType // map from dimension index => enum cases. It will // only include columns used in dimensions. EnumDicts map[int][]string } // CalculateSizes returns the header size and total size of used by this hll data. func (data *HLLData) CalculateSizes() (uint32, int64) { // num enum columns (1 byte) var headerSize = 1 // Dims per width (1 byte * numDims) headerSize += len(data.NumDimsPerDimWidth) // padding for 8 bytes headerSize = utils.AlignOffset(headerSize, 8) // result size (4 bytes) + raw_dim_values_vector_length (4 bytes) headerSize += 8 // Dim indexes. headerSize += (len(data.DimIndexes) + 7) / 8 * 8 // Data types. headerSize += (len(data.DataTypes)*4 + 7) / 8 * 8 // Enum cases. for _, enumCases := range data.EnumDicts { // number of bytes of enum cases + dim index + padding = 8 bytes. headerSize += int(8 + CalculateEnumCasesBytes(enumCases)) } totalSize := int64(headerSize) // Dim values. totalSize += int64(data.PaddedRawDimValuesVectorLength) // Counts. totalSize += int64(2*data.ResultSize+7) / 8 * 8 // HLL dense vector. totalSize += data.PaddedHLLVectorLength return uint32(headerSize), totalSize } // CalculateEnumCasesBytes calculates how many bytes the enum case values will occupy including 8 bytes alignment. func CalculateEnumCasesBytes(enumCases []string) uint32 { var size uint32 for _, enumCase := range enumCases { size += uint32(len(enumCase)) } // enum cases delimiters. size += uint32(len(enumCases)) * 2 // align by 8 bytes. return (size + 7) / 8 * 8 } // HLLRegister is the register used in the sparse representation. type HLLRegister struct { Index uint16 `json:"index"` Rho byte `json:"rho"` } // HLL stores only the dense data for now. type HLL struct { SparseData []HLLRegister // Unsorted registers. DenseData []byte // Rho by register index. NonZeroRegisters uint16 } // Merge merges (using max(rho)) the other HLL (sparse or dense) into this one (will be converted to dense). func (hll *HLL) Merge(other HLL) { hll.ConvertToDense() for _, register := range other.SparseData { oldRho := hll.DenseData[register.Index] if oldRho == 0 { hll.NonZeroRegisters++ } if oldRho < register.Rho { hll.DenseData[register.Index] = register.Rho } } for index, rho := range other.DenseData { oldRho := hll.DenseData[index] if oldRho == 0 && rho != 0 { hll.NonZeroRegisters++ } if oldRho < rho { hll.DenseData[index] = rho } } } // ConvertToDense converts the HLL to dense format. func (hll *HLL) ConvertToDense() { if len(hll.DenseData) != 0 { return } hll.DenseData = make([]byte, 1<<hllP) for _, register := range hll.SparseData { hll.DenseData[register.Index] = register.Rho } hll.SparseData = nil } // ConvertToSparse try converting the hll to sparse format if it turns out to be cheaper. func (hll *HLL) ConvertToSparse() bool { if hll.NonZeroRegisters*4 >= 1<<hllP { return false } if hll.SparseData != nil { return true } hll.SparseData = make([]HLLRegister, 0, hll.NonZeroRegisters) for index, rho := range hll.DenseData { if rho != 0 { hll.SparseData = append(hll.SparseData, HLLRegister{uint16(index), rho}) } } hll.DenseData = nil return true } // Set sets rho for the specified register index. Caller must ensure that each register is set no more than once. func (hll *HLL) Set(index uint16, rho byte) { hll.NonZeroRegisters++ if len(hll.DenseData) != 0 { hll.DenseData[index] = rho return } hll.SparseData = append(hll.SparseData, HLLRegister{index, rho}) if hll.NonZeroRegisters*4 >= 1<<hllP { hll.ConvertToDense() } } func parseOldTimeseriesHLLResult(buffer []byte, ignoreEnum bool) (AQLQueryResult, error) { // empty result buffer if len(buffer) == 0 { return AQLQueryResult{}, nil } reader := utils.NewStreamDataReader(bytes.NewBuffer(buffer)) numFourBytesDims, err := reader.ReadUint8() if err != nil { return nil, err } numTwoBytesDims, err := reader.ReadUint8() if err != nil { return nil, err } numOneBytesDims, err := reader.ReadUint8() if err != nil { return nil, err } numEnumColumns, err := reader.ReadUint8() if err != nil { return nil, err } totalDims := int(numFourBytesDims + numTwoBytesDims + numOneBytesDims) numDimsPerDimWidth := DimCountsPerDimWidth{0, 0, numFourBytesDims, numTwoBytesDims, numOneBytesDims} resultSize, err := reader.ReadUint32() if err != nil { return nil, err } paddedRawDimValuesVectorLength, err := reader.ReadUint32() if err != nil { return nil, err } if err := reader.SkipBytes(4); err != nil { return nil, err } dimIndexes := make([]uint8, totalDims) for i := range dimIndexes { dimIndexes[i], err = reader.ReadUint8() if err != nil { return nil, err } } if err = reader.ReadPadding(int(totalDims), 8); err != nil { return nil, err } dataTypes := make([]memCom.DataType, totalDims) for i := range dataTypes { rawDataType, err := reader.ReadUint32() if err != nil { return nil, err } dataType, err := memCom.NewDataType(rawDataType) if err != nil { return nil, err } dataTypes[i] = dataType } if err = reader.ReadPadding(int(totalDims)*4, 8); err != nil { return nil, err } enumDicts := make(map[int][]string) var i uint8 for ; i < numEnumColumns; i++ { enumCasesBytes, err := reader.ReadUint32() if err != nil { return nil, err } dimIdx, err := reader.ReadUint16() if err != nil { return nil, err } reader.SkipBytes(2) rawEnumCases := make([]byte, enumCasesBytes) if err = reader.Read(rawEnumCases); err != nil { return nil, err } enumCases := strings.Split(string(rawEnumCases), EnumDelimiter) // remove last empty element. enumCases = enumCases[:len(enumCases)-1] enumDicts[int(dimIdx)] = enumCases } headerSize := reader.GetBytesRead() result := make(AQLQueryResult) paddedCountLength := uint32(2*resultSize+7) / 8 * 8 dimValuesVector := unsafe.Pointer(&buffer[headerSize]) countVector := unsafe.Pointer(&buffer[headerSize+paddedRawDimValuesVectorLength]) hllVector := unsafe.Pointer(&buffer[headerSize+paddedRawDimValuesVectorLength+paddedCountLength]) dimOffsets := make([][2]int, totalDims) dimValues := make([]*string, totalDims) for i := 0; i < totalDims; i++ { dimIndex := int(dimIndexes[i]) valueOffset, nullOffset := GetDimensionStartOffsets(numDimsPerDimWidth, dimIndex, int(resultSize)) dimOffsets[i] = [2]int{valueOffset, nullOffset} } var currentOffset int64 for i := 0; i < int(resultSize); i++ { for dimIndex := 0; dimIndex < totalDims; dimIndex++ { offsets := dimOffsets[dimIndex] valueOffset, nullOffset := offsets[0], offsets[1] valuePtr, nullPtr := memAccess(dimValuesVector, valueOffset), memAccess(dimValuesVector, nullOffset) enumDict := []string{} if !ignoreEnum { enumDict = enumDicts[dimIndex] } dimValues[dimIndex] = ReadDimension(valuePtr, nullPtr, i, dataTypes[dimIndex], enumDict, nil, nil) } count := *(*uint16)(memAccess(countVector, int(2*i))) hll := readHLL(hllVector, count, &currentOffset) result.SetHLL(dimValues, hll) } return result, nil } func parseTimeseriesHLLResult(buffer []byte, ignoreEnum bool) (AQLQueryResult, error) { // empty result buffer if len(buffer) == 0 { return AQLQueryResult{}, nil } reader := utils.NewStreamDataReader(bytes.NewBuffer(buffer)) numEnumColumns, err := reader.ReadUint8() if err != nil { return nil, err } var numDimsPerDimWidth DimCountsPerDimWidth err = reader.Read([]byte(numDimsPerDimWidth[:])) if err != nil { return AQLQueryResult{}, nil } totalDims := 0 for _, dimCount := range numDimsPerDimWidth { totalDims += int(dimCount) } err = reader.ReadPadding(int(reader.GetBytesRead()), 8) if err != nil { return nil, err } resultSize, err := reader.ReadUint32() if err != nil { return nil, err } paddedRawDimValuesVectorLength, err := reader.ReadUint32() if err != nil { return nil, err } dimIndexes := make([]uint8, totalDims) for i := range dimIndexes { dimIndexes[i], err = reader.ReadUint8() if err != nil { return nil, err } } if err = reader.ReadPadding(int(totalDims), 8); err != nil { return nil, err } dataTypes := make([]memCom.DataType, totalDims) for i := range dataTypes { rawDataType, err := reader.ReadUint32() if err != nil { return nil, err } dataType, err := memCom.NewDataType(rawDataType) if err != nil { return nil, err } dataTypes[i] = dataType } if err = reader.ReadPadding(int(totalDims)*4, 8); err != nil { return nil, err } enumDicts := make(map[int][]string) var i uint8 for ; i < numEnumColumns; i++ { enumCasesBytes, err := reader.ReadUint32() if err != nil { return nil, err } dimIdx, err := reader.ReadUint16() if err != nil { return nil, err } reader.SkipBytes(2) rawEnumCases := make([]byte, enumCasesBytes) if err = reader.Read(rawEnumCases); err != nil { return nil, err } enumCases := strings.Split(string(rawEnumCases), EnumDelimiter) // remove last empty element. enumCases = enumCases[:len(enumCases)-1] enumDicts[int(dimIdx)] = enumCases } headerSize := reader.GetBytesRead() result := make(AQLQueryResult) paddedCountLength := uint32(2*resultSize+7) / 8 * 8 dimValuesVector := unsafe.Pointer(&buffer[headerSize]) countVector := unsafe.Pointer(&buffer[headerSize+paddedRawDimValuesVectorLength]) hllVector := unsafe.Pointer(&buffer[headerSize+paddedRawDimValuesVectorLength+paddedCountLength]) dimOffsets := make([][2]int, totalDims) dimValues := make([]*string, totalDims) for i := 0; i < totalDims; i++ { dimIndex := int(dimIndexes[i]) valueOffset, nullOffset := GetDimensionStartOffsets(numDimsPerDimWidth, dimIndex, int(resultSize)) dimOffsets[i] = [2]int{valueOffset, nullOffset} } var currentOffset int64 for i := 0; i < int(resultSize); i++ { for dimIndex := 0; dimIndex < totalDims; dimIndex++ { offsets := dimOffsets[dimIndex] valueOffset, nullOffset := offsets[0], offsets[1] valuePtr, nullPtr := memAccess(dimValuesVector, valueOffset), memAccess(dimValuesVector, nullOffset) enumDict := []string{} if !ignoreEnum { enumDict = enumDicts[dimIndex] } dimValues[dimIndex] = ReadDimension(valuePtr, nullPtr, i, dataTypes[dimIndex], enumDict, nil, nil) } count := *(*uint16)(memAccess(countVector, int(2*i))) hll := readHLL(hllVector, count, &currentOffset) result.SetHLL(dimValues, hll) } return result, nil } // ComputeHLLResult computes hll result func ComputeHLLResult(result AQLQueryResult) AQLQueryResult { return computeHLLResultRecursive(result).(AQLQueryResult) } // computeHLLResultRecursive computes hll value func computeHLLResultRecursive(result interface{}) interface{} { switch r := result.(type) { case AQLQueryResult: for k, v := range r { r[k] = computeHLLResultRecursive(v) } return r case map[string]interface{}: for k, v := range r { r[k] = computeHLLResultRecursive(v) } return r case HLL: return r.Compute() default: // return original for all other types return r } } // NewTimeSeriesHLLResult creates a new NewTimeSeriesHLLResult and deserialize the buffer into the result. func NewTimeSeriesHLLResult(buffer []byte, magicHeader uint32, ignoreEnum bool) (AQLQueryResult, error) { switch magicHeader { case OldHLLDataHeader: return parseOldTimeseriesHLLResult(buffer, ignoreEnum) case HLLDataHeader: return parseTimeseriesHLLResult(buffer, ignoreEnum) default: // should not happen return nil, utils.StackError(nil, "magic header version unsupported: %d", magicHeader) } } // memAccess access memory location with starting pointer and an offset. func memAccess(p unsafe.Pointer, offset int) unsafe.Pointer { return unsafe.Pointer(uintptr(p) + uintptr(offset)) } // readHLL reads the HLL struct from the raw buffer and returns next offset func readHLL(hllVector unsafe.Pointer, count uint16, currentOffset *int64) HLL { var sparseData []HLLRegister var nonZeroRegisters uint16 var denseData []byte if count < DenseThreshold { var i uint16 sparseData = make([]HLLRegister, 0, count) for ; i < count; i++ { data := *(*uint32)(memAccess(hllVector, int(*currentOffset))) index := uint16(data) // Big-endian from UNHEX... rho := byte((data >> 16) & 0xFF) sparseData = append(sparseData, HLLRegister{ Index: index, Rho: rho, }) *currentOffset += 4 } nonZeroRegisters = count } else { denseData = (*(*[DenseDataLength]byte)((memAccess(hllVector, int(*currentOffset)))))[:] *currentOffset += DenseDataLength for _, b := range denseData { if b != 0 { nonZeroRegisters++ } } } return HLL{ DenseData: denseData, SparseData: sparseData, NonZeroRegisters: nonZeroRegisters, } } // ParseHLLQueryResults will parse the response body into a slice of query results and a slice of errors. func ParseHLLQueryResults(data []byte, ignoreEnum bool) (queryResults []AQLQueryResult, queryErrors []error, err error) { reader := utils.NewStreamDataReader(bytes.NewBuffer(data)) var magicHeader uint32 magicHeader, err = reader.ReadUint32() if err != nil { return } if magicHeader != OldHLLDataHeader && magicHeader != HLLDataHeader { err = utils.StackError(nil, "header %x does not match HLLDataHeader %x or %x", magicHeader, OldHLLDataHeader, HLLDataHeader) return } reader.SkipBytes(4) var size uint32 var isErr uint8 for size, err = reader.ReadUint32(); err == nil; size, err = reader.ReadUint32() { if isErr, err = reader.ReadUint8(); err != nil { return } reader.SkipBytes(3) bs := make([]byte, size) err = reader.Read(bs) if err != nil { break } if isErr != 0 { queryErrors = append(queryErrors, errors.New(string(bs))) queryResults = append(queryResults, nil) } else { var res AQLQueryResult if res, err = NewTimeSeriesHLLResult(bs, magicHeader, ignoreEnum); err != nil { return } queryResults = append(queryResults, res) queryErrors = append(queryErrors, nil) } } if err == io.EOF { err = nil } return } type hllBiasByDistance struct { distance, bias float64 } func getEstimateBias(estimate float64) float64 { i := sort.Search(len(hllRawEstimates), func(i int) bool { return estimate < hllRawEstimates[i] }) // Find nearest k neighbors. k := 6 startIdx := i - 1 - k endIdx := i + k if startIdx < 0 { startIdx = 0 } if endIdx > len(hllRawEstimates) { endIdx = len(hllRawEstimates) } biases := make(hllBiasesByDistances, endIdx-startIdx) for i := startIdx; i < endIdx; i++ { biases[i-startIdx].distance = (hllRawEstimates[i] - estimate) * (hllRawEstimates[i] - estimate) biases[i-startIdx].bias = hllBiases[i] } sort.Sort(biases) biasSum := 0.0 for i := 0; i < k; i++ { biasSum += biases[i].bias } return biasSum / float64(k) } // Decode decodes the HLL from cache cache. // Interprets as dense or sparse format based on len(data). func (hll *HLL) Decode(data []byte) { if len(data) == 1<<hllP { hll.DenseData = data hll.SparseData = nil hll.NonZeroRegisters = 0 for _, rho := range data { if rho != 0 { hll.NonZeroRegisters++ } } } else { hll.DenseData = nil hll.SparseData = make([]HLLRegister, len(data)/3) hll.NonZeroRegisters = uint16(len(data) / 3) for i := 0; i < len(data)/3; i++ { var register HLLRegister register.Index = uint16(data[i*3]) | (uint16(data[i*3+1]) << 8) register.Rho = data[i*3+2] hll.SparseData[i] = register } } } // Encode encodes the HLL for cache storage. // Dense format will have a length of 1<<hllP. // Sparse format will have a smaller length func (hll *HLL) Encode() []byte { if len(hll.DenseData) != 0 { return hll.DenseData } return hll.encodeSparse(false) } // EncodeBinary converts HLL to binary format // aligns to 4 bytes for sparse hll // used to build response for application/hll queries from HLL struct func (hll *HLL) EncodeBinary() []byte { if len(hll.DenseData) != 0 { return hll.DenseData } return hll.encodeSparse(true) } // encode sparse HLL value in 2 modes // 1. padding (1 byte) | rho (1 byte) | index (2 byte) endianness based on the machine itself // 2. rho (1 byte) | index (2 byte) small endian // based on `padding` parameter func (hll *HLL) encodeSparse(padding bool) []byte { recordValueBytes := 3 if padding { recordValueBytes = 4 } data := make([]byte, (recordValueBytes)*len(hll.SparseData)) for i, register := range hll.SparseData { if padding { *(*uint32)(unsafe.Pointer(&data[i*recordValueBytes])) = (uint32(int8(register.Rho)))<<16 | uint32(register.Index) } else { data[i*recordValueBytes] = byte(register.Index & 0xff) data[i*recordValueBytes+1] = byte(register.Index >> 8) data[i*recordValueBytes+2] = register.Rho } } return data } // Compute computes the result of the HLL. func (hll *HLL) Compute() float64 { nonZeroRegisters := float64(hll.NonZeroRegisters) m := float64(uint64(1) << hllP) // Sum of reciproclas of rhos var sumOfReciprocals float64 for _, register := range hll.SparseData { sumOfReciprocals += 1.0 / float64(uint64(1)<<register.Rho) } if len(hll.DenseData) == 0 { // Add missing rho reciprocals for sparse form. sumOfReciprocals += m - nonZeroRegisters } for _, rho := range hll.DenseData { sumOfReciprocals += 1.0 / float64(uint64(1)<<rho) } // Initial estimation. alpha := 0.7213 / (1 + 1.079/m) estimate := alpha * m * m / sumOfReciprocals // Bias correction. if estimate <= 5.0*m { estimate -= getEstimateBias(estimate) } estimateH := estimate if nonZeroRegisters < m { // Linear counting estimateH = m * math.Log(m/(m-nonZeroRegisters)) } if estimateH <= hllThreshold { estimate = estimateH } // Round return float64(uint64(estimate)) } type hllBiasesByDistances []hllBiasByDistance func (b hllBiasesByDistances) Len() int { return len(b) } func (b hllBiasesByDistances) Swap(i, j int) { b[i], b[j] = b[j], b[i] } func (b hllBiasesByDistances) Less(i, j int) bool { return b[i].distance < b[j].distance } // threshold and bias data taken from google's bias correction data set: // https://docs.google.com/document/d/1gyjfMHy43U9OWBXxfaeG-3MjGzejW1dlpyMwEYAAWEI/view?fullscreen# var hllP byte = 14 var hllThreshold = 15500.0 // precision 14 var hllRawEstimates = []float64{ 11817.475, 12015.0046, 12215.3792, 12417.7504, 12623.1814, 12830.0086, 13040.0072, 13252.503, 13466.178, 13683.2738, 13902.0344, 14123.9798, 14347.394, 14573.7784, 14802.6894, 15033.6824, 15266.9134, 15502.8624, 15741.4944, 15980.7956, 16223.8916, 16468.6316, 16715.733, 16965.5726, 17217.204, 17470.666, 17727.8516, 17986.7886, 18247.6902, 18510.9632, 18775.304, 19044.7486, 19314.4408, 19587.202, 19862.2576, 20135.924, 20417.0324, 20697.9788, 20979.6112, 21265.0274, 21550.723, 21841.6906, 22132.162, 22428.1406, 22722.127, 23020.5606, 23319.7394, 23620.4014, 23925.2728, 24226.9224, 24535.581, 24845.505, 25155.9618, 25470.3828, 25785.9702, 26103.7764, 26420.4132, 26742.0186, 27062.8852, 27388.415, 27714.6024, 28042.296, 28365.4494, 28701.1526, 29031.8008, 29364.2156, 29704.497, 30037.1458, 30380.111, 30723.8168, 31059.5114, 31404.9498, 31751.6752, 32095.2686, 32444.7792, 32794.767, 33145.204, 33498.4226, 33847.6502, 34209.006, 34560.849, 34919.4838, 35274.9778, 35635.1322, 35996.3266, 36359.1394, 36722.8266, 37082.8516, 37447.7354, 37815.9606, 38191.0692, 38559.4106, 38924.8112, 39294.6726, 39663.973, 40042.261, 40416.2036, 40779.2036, 41161.6436, 41540.9014, 41921.1998, 42294.7698, 42678.5264, 43061.3464, 43432.375, 43818.432, 44198.6598, 44583.0138, 44970.4794, 45353.924, 45729.858, 46118.2224, 46511.5724, 46900.7386, 47280.6964, 47668.1472, 48055.6796, 48446.9436, 48838.7146, 49217.7296, 49613.7796, 50010.7508, 50410.0208, 50793.7886, 51190.2456, 51583.1882, 51971.0796, 52376.5338, 52763.319, 53165.5534, 53556.5594, 53948.2702, 54346.352, 54748.7914, 55138.577, 55543.4824, 55941.1748, 56333.7746, 56745.1552, 57142.7944, 57545.2236, 57935.9956, 58348.5268, 58737.5474, 59158.5962, 59542.6896, 59958.8004, 60349.3788, 60755.0212, 61147.6144, 61548.194, 61946.0696, 62348.6042, 62763.603, 63162.781, 63560.635, 63974.3482, 64366.4908, 64771.5876, 65176.7346, 65597.3916, 65995.915, 66394.0384, 66822.9396, 67203.6336, 67612.2032, 68019.0078, 68420.0388, 68821.22, 69235.8388, 69640.0724, 70055.155, 70466.357, 70863.4266, 71276.2482, 71677.0306, 72080.2006, 72493.0214, 72893.5952, 73314.5856, 73714.9852, 74125.3022, 74521.2122, 74933.6814, 75341.5904, 75743.0244, 76166.0278, 76572.1322, 76973.1028, 77381.6284, 77800.6092, 78189.328, 78607.0962, 79012.2508, 79407.8358, 79825.725, 80238.701, 80646.891, 81035.6436, 81460.0448, 81876.3884} // precision 14 var hllBiases = []float64{ 11816.475, 11605.0046, 11395.3792, 11188.7504, 10984.1814, 10782.0086, 10582.0072, 10384.503, 10189.178, 9996.2738, 9806.0344, 9617.9798, 9431.394, 9248.7784, 9067.6894, 8889.6824, 8712.9134, 8538.8624, 8368.4944, 8197.7956, 8031.8916, 7866.6316, 7703.733, 7544.5726, 7386.204, 7230.666, 7077.8516, 6926.7886, 6778.6902, 6631.9632, 6487.304, 6346.7486, 6206.4408, 6070.202, 5935.2576, 5799.924, 5671.0324, 5541.9788, 5414.6112, 5290.0274, 5166.723, 5047.6906, 4929.162, 4815.1406, 4699.127, 4588.5606, 4477.7394, 4369.4014, 4264.2728, 4155.9224, 4055.581, 3955.505, 3856.9618, 3761.3828, 3666.9702, 3575.7764, 3482.4132, 3395.0186, 3305.8852, 3221.415, 3138.6024, 3056.296, 2970.4494, 2896.1526, 2816.8008, 2740.2156, 2670.497, 2594.1458, 2527.111, 2460.8168, 2387.5114, 2322.9498, 2260.6752, 2194.2686, 2133.7792, 2074.767, 2015.204, 1959.4226, 1898.6502, 1850.006, 1792.849, 1741.4838, 1687.9778, 1638.1322, 1589.3266, 1543.1394, 1496.8266, 1447.8516, 1402.7354, 1361.9606, 1327.0692, 1285.4106, 1241.8112, 1201.6726, 1161.973, 1130.261, 1094.2036, 1048.2036, 1020.6436, 990.901400000002, 961.199800000002, 924.769800000002, 899.526400000002, 872.346400000002, 834.375, 810.432000000001, 780.659800000001, 756.013800000001, 733.479399999997, 707.923999999999, 673.858, 652.222399999999, 636.572399999997, 615.738599999997, 586.696400000001, 564.147199999999, 541.679600000003, 523.943599999999, 505.714599999999, 475.729599999999, 461.779600000002, 449.750800000002, 439.020799999998, 412.7886, 400.245600000002, 383.188199999997, 362.079599999997, 357.533799999997, 334.319000000003, 327.553399999997, 308.559399999998, 291.270199999999, 279.351999999999, 271.791400000002, 252.576999999997, 247.482400000001, 236.174800000001, 218.774599999997, 220.155200000001, 208.794399999999, 201.223599999998, 182.995600000002, 185.5268, 164.547400000003, 176.5962, 150.689599999998, 157.8004, 138.378799999999, 134.021200000003, 117.614399999999, 108.194000000003, 97.0696000000025, 89.6042000000016, 95.6030000000028, 84.7810000000027, 72.635000000002, 77.3482000000004, 59.4907999999996, 55.5875999999989, 50.7346000000034, 61.3916000000027, 50.9149999999936, 39.0384000000049, 58.9395999999979, 29.633600000001, 28.2032000000036, 26.0078000000067, 17.0387999999948, 9.22000000000116, 13.8387999999977, 8.07240000000456, 14.1549999999988, 15.3570000000036, 3.42660000000615, 6.24820000000182, -2.96940000000177, -8.79940000000352, -5.97860000000219, -14.4048000000039, -3.4143999999942, -13.0148000000045, -11.6977999999945, -25.7878000000055, -22.3185999999987, -24.409599999999, -31.9756000000052, -18.9722000000038, -22.8678000000073, -30.8972000000067, -32.3715999999986, -22.3907999999938, -43.6720000000059, -35.9038, -39.7492000000057, -54.1641999999993, -45.2749999999942, -42.2989999999991, -44.1089999999967, -64.3564000000042, -49.9551999999967, -42.6116000000038} // HLLDataWriter is the struct to serialize HLL Data struct. type HLLDataWriter struct { HLLData Buffer []byte } // SerializeHeader serialize HLL header // -----------query result 0------------------- // <header> // [uint8] num_enum_columns [uint8] bytes per dim ... [padding for 8 bytes] // [uint32] result_size [uint32] raw_dim_values_vector_length // [uint8] dim_index_0... [uint8] dim_index_n [padding for 8 bytes] // [uint32] data_type_0...[uint32] data_type_n [padding for 8 bytes] // // <enum cases 0> // [uint32_t] number of bytes of enum cases [uint16] column_index [2 bytes: padding] // <enum values 0> delimited by "\u0000\n" [padding for 8 bytes] // // <end of header> func (builder *HLLDataWriter) SerializeHeader() error { writer := utils.NewBufferWriter(builder.Buffer) // num_enum_columns if err := writer.AppendUint8(uint8(len(builder.EnumDicts))); err != nil { return err } // bytes per dim if err := writer.Append([]byte(builder.NumDimsPerDimWidth[:])); err != nil { return err } writer.AlignBytes(8) // result_size if err := writer.AppendUint32(builder.ResultSize); err != nil { return err } // raw_dim_values_vector_length if err := writer.AppendUint32(builder.PaddedRawDimValuesVectorLength); err != nil { return err } // dim_indexes for _, dimIndex := range builder.DimIndexes { if err := writer.AppendUint8(uint8(dimIndex)); err != nil { return err } } writer.AlignBytes(8) // data_types for _, dataType := range builder.DataTypes { if err := writer.AppendUint32(uint32(dataType)); err != nil { return err } } writer.AlignBytes(8) // Write enum cases. for dimIdx, enumCases := range builder.EnumDicts { enumCasesBytes := CalculateEnumCasesBytes(enumCases) if err := writer.AppendUint32(enumCasesBytes); err != nil { return err } if err := writer.AppendUint16(uint16(dimIdx)); err != nil { return err } // padding writer.SkipBytes(2) var enumCaseBytesWritten uint32 for _, enumCase := range enumCases { if err := writer.Append([]byte(enumCase)); err != nil { return err } if err := writer.Append([]byte(EnumDelimiter)); err != nil { return err } enumCaseBytesWritten += uint32(len(enumCase)) + 2 } writer.SkipBytes(int(enumCasesBytes - enumCaseBytesWritten)) } return nil } // HLLQueryResults holds the buffer to store multiple hll query results or errors. type HLLQueryResults struct { buffer bytes.Buffer } // NewHLLQueryResults returns a new NewHLLQueryResults and writes the magical header and // padding to underlying buffer. func NewHLLQueryResults() *HLLQueryResults { r := &HLLQueryResults{} header := HLLDataHeader r.buffer.Write((*(*[4]byte)(unsafe.Pointer(&header)))[:]) // Padding. var bs [4]byte r.buffer.Write(bs[:]) return r } // WriteResult write result to the buffer. func (r *HLLQueryResults) WriteResult(result []byte) { totalSize := uint32(len(result)) // Write total size. r.buffer.Write((*(*[4]byte)(unsafe.Pointer(&totalSize)))[:]) // 0 stands for result. r.buffer.WriteByte(byte(0)) // Padding. var bs [3]byte r.buffer.Write(bs[:]) r.buffer.Write(result) } // WriteError write error to the buffer. func (r *HLLQueryResults) WriteError(err error) { totalSize := len(err.Error()) // Write total size. r.buffer.Write((*(*[4]byte)(unsafe.Pointer(&totalSize)))[:]) // 1 stands for error. r.buffer.WriteByte(byte(1)) // Padding. var bs [3]byte r.buffer.Write(bs[:]) strErr := err.Error() padding := (8 - (len(strErr) & 7)) & 8 r.buffer.Write([]byte(strErr)) if padding > 0 { paddingBytes := make([]byte, padding) r.buffer.Write(paddingBytes) } } // GetBytes returns the underlying bytes. func (r *HLLQueryResults) GetBytes() []byte { return r.buffer.Bytes() } // BuildVectorsFromHLLResult traverses input HLL query result and builds byte slices // result must have HLL in the leave nodes // this function is useful when converting HLL query result to it's binary format // dimDataTypes stores types of each dimension, in the the same order as in query // dimensionVectorIndex stores re-ordered dimension index, sorted by dim datatype width func BuildVectorsFromHLLResult(result AQLQueryResult, dimDataTypes []memCom.DataType, enumDicts map[int]map[string]int, dimensionVectorIndex []int) (hllVector, dimVector, countVector []byte, err error) { hllVector = []byte{} countVector = []byte{} dimVectors := make([][]byte, len(dimDataTypes)) validityVectors := make([][]byte, len(dimDataTypes)) _, err = traverseRecursive(0, map[string]interface{}(result), dimDataTypes, enumDicts, &hllVector, &countVector, dimVectors, validityVectors) if err != nil { return } dimVector = []byte{} // append in descending order of width for _, idx := range dimensionVectorIndex { dimVector = append(dimVector, dimVectors[idx]...) } for _, idx := range dimensionVectorIndex { dimVector = append(dimVector, validityVectors[idx]...) } return } // helper function that traverses HLL query result in post order // returns: size is the total num dim rows of the subtree rooted by current node func traverseRecursive(dimIdx int, curr interface{}, dimDataTypes []memCom.DataType, enumDicts map[int]map[string]int, hllVector, countVector *[]byte, dimVectors, validityVectors [][]byte) (size int, err error) { switch v := curr.(type) { case map[string]interface{}: dimDataType := dimDataTypes[dimIdx] dimValueBytes := memCom.DataTypeBytes(dimDataType) // iterate map in order keys := make([]string, len(v)) kidx := 0 for k, _ := range v { keys[kidx] = k kidx++ } sort.Strings(keys) for _, k := range keys { v := v[k] // visit child first var childSize int childSize, err = traverseRecursive(dimIdx+1, v, dimDataTypes, enumDicts, hllVector, countVector, dimVectors, validityVectors) if err != nil { return } // convert dimension var ( dataVal memCom.DataValue isValid uint8 = 1 ) if NULLString == k { isValid = 0 } else { if enumDict, ok := enumDicts[dimIdx]; ok { enumVal := enumDict[k] if dimValueBytes == 1 { ui8 := uint8(enumVal) dataVal = memCom.DataValue{ Valid: true, OtherVal: unsafe.Pointer(&ui8), } } else if dimValueBytes == 2 { ui16 := uint16(enumVal) dataVal = memCom.DataValue{ Valid: true, OtherVal: unsafe.Pointer(&ui16), } } else { err = utils.StackError(nil, "data width %d doesn't match any enum", dimValueBytes) } } else { dataVal, err = memCom.ValueFromString(k, dimDataType) if err != nil { return } } } var bs []byte if dataVal.Valid { byteVal := dataVal.OtherVal switch dimValueBytes { case 8: bs = (*[8]byte)(byteVal)[:] case 4: bs = (*[4]byte)(byteVal)[:] case 2: bs = (*[2]byte)(byteVal)[:] case 1: bs = (*[1]byte)(byteVal)[:] } } else { bs = make([]byte, dimValueBytes) } for i := 0; i < childSize; i++ { dimVectors[dimIdx] = append(dimVectors[dimIdx], bs...) validityVectors[dimIdx] = append(validityVectors[dimIdx], byte(isValid)) } size += childSize } case HLL: count := v.NonZeroRegisters if count < DenseThreshold { if !v.ConvertToSparse() { err = utils.StackError(nil, "Failed to convert HLL to sparse %+v", v) return } } else { v.ConvertToDense() } bs := v.EncodeBinary() *hllVector = append(*hllVector, bs...) *countVector = append(*countVector, (*((*[2]byte)(unsafe.Pointer(&count))))[:]...) size = 1 default: err = utils.StackError(nil, "unknown type %+v", curr) } return }