in query/aql_postprocessor.go [66:161]
func (qc *AQLQueryContext) flushResultBuffer() {
start := utils.Now()
defer func() { qc.reportTiming(qc.cudaStreams[0], &start, resultFlushTiming) }()
if qc.Results == nil {
qc.Results = make(queryCom.AQLQueryResult)
}
oopkContext := qc.OOPK
dpc := qc.resultFlushContext
dimValues := make([]*string, len(oopkContext.Dimensions))
var fromOffset, toOffset int
if qc.fromTime != nil && qc.toTime != nil {
_, fromOffset = qc.fromTime.Time.Zone()
_, toOffset = qc.toTime.Time.Zone()
}
dimOffsets := make(map[int][2]int)
for dimIndex := range oopkContext.Dimensions {
dimVectorIndex := oopkContext.DimensionVectorIndex[dimIndex]
valueOffset, nullOffset := queryCom.GetDimensionStartOffsets(oopkContext.NumDimsPerDimWidth, dimVectorIndex, oopkContext.ResultSize)
dimOffsets[dimIndex] = [2]int{valueOffset, nullOffset}
}
for i := 0; i < oopkContext.ResultSize; i++ {
dimReadingStart := utils.Now()
for dimIndex := range oopkContext.Dimensions {
offsets := dimOffsets[dimIndex]
valueOffset, nullOffset := offsets[0], offsets[1]
valuePtr, nullPtr := utils.MemAccess(oopkContext.dimensionVectorH, valueOffset), utils.MemAccess(oopkContext.dimensionVectorH, nullOffset)
if qc.Query.Dimensions[dimIndex].IsTimeDimension() && dpc.dimensionValueCache[dimIndex] == nil {
dpc.dimensionValueCache[dimIndex] = make(map[queryCom.TimeDimensionMeta]map[int64]string)
}
var timeDimensionMeta *queryCom.TimeDimensionMeta
if qc.Query.Dimensions[dimIndex].IsTimeDimension() {
timeDimensionMeta = &queryCom.TimeDimensionMeta{
TimeBucketizer: qc.Query.Dimensions[dimIndex].TimeBucketizer,
TimeUnit: qc.Query.Dimensions[dimIndex].TimeUnit,
IsTimezoneTable: qc.timezoneTable.tableColumn != "",
TimeZone: qc.fixedTimezone,
DSTSwitchTs: qc.dstswitch,
FromOffset: fromOffset,
ToOffset: toOffset,
}
}
// don't translate enum if it's for distributed mode (DataOnly == true)
var enumDict []string
if !qc.DataOnly {
enumDict = dpc.reverseDicts[dimIndex]
}
dimValues[dimIndex] = queryCom.ReadDimension(
valuePtr, nullPtr, i, dpc.dimensionDataTypes[dimIndex], enumDict,
timeDimensionMeta, dpc.dimensionValueCache[dimIndex])
}
utils.GetRootReporter().GetTimer(utils.QueryDimReadLatency).Record(utils.Now().Sub(dimReadingStart))
if qc.IsNonAggregationQuery {
if qc.ResponseWriter != nil {
nullStr := queryCom.NULLString
for i, dimVal := range dimValues {
if dimVal == nil {
dimValues[i] = &nullStr
}
}
valuesBytes, _ := json.Marshal(dimValues)
if qc.resultFlushContext.rowsFlushed > 0 {
qc.ResponseWriter.Write(bytesComma)
}
qc.ResponseWriter.Write(valuesBytes)
qc.resultFlushContext.rowsFlushed++
} else {
qc.Results.Append(dimValues)
}
} else {
measureBytes := oopkContext.MeasureBytes
// For avg aggregation function, we only need to read first 4 bytes which is the average.
if qc.OOPK.AggregateType == C.AGGR_AVG_FLOAT {
measureBytes = 4
}
measureValue := readMeasure(
utils.MemAccess(oopkContext.measureVectorH, i*oopkContext.MeasureBytes), oopkContext.Measure,
measureBytes)
qc.Results.Set(dimValues, measureValue)
}
}
}