query/aql_postprocessor.go (193 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package query // #include "time_series_aggregate.h" import "C" import ( "encoding/json" "github.com/uber/aresdb/cgoutils" memCom "github.com/uber/aresdb/memstore/common" queryCom "github.com/uber/aresdb/query/common" "github.com/uber/aresdb/query/expr" "github.com/uber/aresdb/utils" "unsafe" ) var bytesComma = []byte(",") // Postprocess converts the internal dimension and measure vector in binary // format to AQLQueryResult nested result format. It also translates enum // values back to their string representations. func (qc *AQLQueryContext) Postprocess() { oopkContext := qc.OOPK if oopkContext.IsHLL() { // skip translate enum for HLL if query is from broker result, err := queryCom.NewTimeSeriesHLLResult(qc.HLLQueryResult, queryCom.HLLDataHeader, qc.DataOnly) if err != nil { // should never be here except bug qc.Error = utils.StackError(err, "failed to read hll result") return } qc.Results = queryCom.ComputeHLLResult(result) return } if !qc.IsNonAggregationQuery { qc.flushResultBuffer() } } func (qc *AQLQueryContext) initResultFlushContext() { qc.resultFlushContext.dimensionValueCache = make([]map[queryCom.TimeDimensionMeta]map[int64]string, len(qc.OOPK.Dimensions)) qc.resultFlushContext.dimensionDataTypes = make([]memCom.DataType, len(qc.OOPK.Dimensions)) qc.resultFlushContext.reverseDicts = make(map[int][]string) oopkContext := qc.OOPK for dimIndex, dimExpr := range oopkContext.Dimensions { qc.resultFlushContext.dimensionDataTypes[dimIndex], qc.resultFlushContext.reverseDicts[dimIndex] = queryCom.GetDimensionDataType(dimExpr), qc.getEnumReverseDict(dimIndex, dimExpr) } } // flushResultBuffer reads dimension and measure data from current OOPK buffer to Results func (qc *AQLQueryContext) flushResultBuffer() { start := utils.Now() defer func() { qc.reportTiming(qc.cudaStreams[0], &start, resultFlushTiming) }() if qc.Results == nil { qc.Results = make(queryCom.AQLQueryResult) } oopkContext := qc.OOPK dpc := qc.resultFlushContext dimValues := make([]*string, len(oopkContext.Dimensions)) var fromOffset, toOffset int if qc.fromTime != nil && qc.toTime != nil { _, fromOffset = qc.fromTime.Time.Zone() _, toOffset = qc.toTime.Time.Zone() } dimOffsets := make(map[int][2]int) for dimIndex := range oopkContext.Dimensions { dimVectorIndex := oopkContext.DimensionVectorIndex[dimIndex] valueOffset, nullOffset := queryCom.GetDimensionStartOffsets(oopkContext.NumDimsPerDimWidth, dimVectorIndex, oopkContext.ResultSize) dimOffsets[dimIndex] = [2]int{valueOffset, nullOffset} } for i := 0; i < oopkContext.ResultSize; i++ { dimReadingStart := utils.Now() for dimIndex := range oopkContext.Dimensions { offsets := dimOffsets[dimIndex] valueOffset, nullOffset := offsets[0], offsets[1] valuePtr, nullPtr := utils.MemAccess(oopkContext.dimensionVectorH, valueOffset), utils.MemAccess(oopkContext.dimensionVectorH, nullOffset) if qc.Query.Dimensions[dimIndex].IsTimeDimension() && dpc.dimensionValueCache[dimIndex] == nil { dpc.dimensionValueCache[dimIndex] = make(map[queryCom.TimeDimensionMeta]map[int64]string) } var timeDimensionMeta *queryCom.TimeDimensionMeta if qc.Query.Dimensions[dimIndex].IsTimeDimension() { timeDimensionMeta = &queryCom.TimeDimensionMeta{ TimeBucketizer: qc.Query.Dimensions[dimIndex].TimeBucketizer, TimeUnit: qc.Query.Dimensions[dimIndex].TimeUnit, IsTimezoneTable: qc.timezoneTable.tableColumn != "", TimeZone: qc.fixedTimezone, DSTSwitchTs: qc.dstswitch, FromOffset: fromOffset, ToOffset: toOffset, } } // don't translate enum if it's for distributed mode (DataOnly == true) var enumDict []string if !qc.DataOnly { enumDict = dpc.reverseDicts[dimIndex] } dimValues[dimIndex] = queryCom.ReadDimension( valuePtr, nullPtr, i, dpc.dimensionDataTypes[dimIndex], enumDict, timeDimensionMeta, dpc.dimensionValueCache[dimIndex]) } utils.GetRootReporter().GetTimer(utils.QueryDimReadLatency).Record(utils.Now().Sub(dimReadingStart)) if qc.IsNonAggregationQuery { if qc.ResponseWriter != nil { nullStr := queryCom.NULLString for i, dimVal := range dimValues { if dimVal == nil { dimValues[i] = &nullStr } } valuesBytes, _ := json.Marshal(dimValues) if qc.resultFlushContext.rowsFlushed > 0 { qc.ResponseWriter.Write(bytesComma) } qc.ResponseWriter.Write(valuesBytes) qc.resultFlushContext.rowsFlushed++ } else { qc.Results.Append(dimValues) } } else { measureBytes := oopkContext.MeasureBytes // For avg aggregation function, we only need to read first 4 bytes which is the average. if qc.OOPK.AggregateType == C.AGGR_AVG_FLOAT { measureBytes = 4 } measureValue := readMeasure( utils.MemAccess(oopkContext.measureVectorH, i*oopkContext.MeasureBytes), oopkContext.Measure, measureBytes) qc.Results.Set(dimValues, measureValue) } } } // PostprocessAsHLLData serializes the query result into HLLData format. It will also release the device memory after // serialization. func (qc *AQLQueryContext) PostprocessAsHLLData() ([]byte, error) { oopkContext := qc.OOPK if oopkContext.ResultSize == 0 { return []byte{}, nil } dataTypes := make([]memCom.DataType, len(oopkContext.Dimensions)) reverseDicts := make(map[int][]string) var timeDimensions []int for dimIndex, ast := range oopkContext.Dimensions { dataTypes[dimIndex], reverseDicts[dimIndex] = queryCom.GetDimensionDataType(ast), qc.getEnumReverseDict(dimIndex, ast) if qc.Query.Dimensions[dimIndex].IsTimeDimension() { timeDimensions = append(timeDimensions, dimIndex) } } return qc.SerializeHLL(dataTypes, reverseDicts, timeDimensions) } // getEnumReverseDict returns the enum reverse dict of a ast node if it's a VarRef node, otherwise it will return // a nil slice. func (qc *AQLQueryContext) getEnumReverseDict(dimIndex int, expression expr.Expr) []string { varRef, ok := expression.(*expr.VarRef) if ok && memCom.IsEnumType(varRef.DataType) { return varRef.EnumReverseDict } // special handling element_at for array enum type if binExpr, ok := expression.(*expr.BinaryExpr); ok && binExpr.Op == expr.ARRAY_ELEMENT_AT { if vr, ok := binExpr.LHS.(*expr.VarRef); ok && memCom.IsEnumType(vr.DataType) { return vr.EnumReverseDict } } // return validShapeUUIDs as the reverse enum dict if dimIndex match geo dimension if qc.OOPK.geoIntersection != nil && qc.OOPK.geoIntersection.dimIndex == dimIndex { return qc.OOPK.geoIntersection.validShapeUUIDs } return nil } // ReleaseHostResultsBuffers deletes the result buffer from host memory after postprocessing func (qc *AQLQueryContext) ReleaseHostResultsBuffers() { ctx := &qc.OOPK cgoutils.HostFree(ctx.dimensionVectorH) ctx.dimensionVectorH = nil if ctx.measureVectorH != nil { cgoutils.HostFree(ctx.measureVectorH) ctx.measureVectorH = nil } // hllVectorD and hllDimRegIDCountD used for hll query only deviceFreeAndSetNil(&ctx.hllVectorD) deviceFreeAndSetNil(&ctx.hllDimRegIDCountD) // set geoIntersection to nil qc.OOPK.geoIntersection = nil } func (qc *AQLQueryContext) ResultsRowsFlushed() int { if qc.IsNonAggregationQuery { return qc.resultFlushContext.rowsFlushed } return qc.OOPK.ResultSize } func readMeasure(measureRow unsafe.Pointer, ast expr.Expr, measureBytes int) *float64 { // TODO: consider converting non-zero identity values to nil. var result float64 if measureBytes == 4 { switch ast.Type() { case expr.Unsigned: result = float64(*(*uint32)(measureRow)) case expr.Signed, expr.Boolean: result = float64(*(*int32)(measureRow)) case expr.Float: result = float64(*(*float32)(measureRow)) default: // Should never happen return nil } } else if measureBytes == 8 { switch ast.Type() { case expr.Unsigned: result = float64(*(*uint64)(measureRow)) case expr.Signed, expr.Boolean: result = float64(*(*int64)(measureRow)) case expr.Float: result = *(*float64)(measureRow) default: // Should never happen. return nil } } else { // should never happen return nil } return &result }