func()

in query/aql_postprocessor.go [66:161]


func (qc *AQLQueryContext) flushResultBuffer() {
	start := utils.Now()
	defer func() { qc.reportTiming(qc.cudaStreams[0], &start, resultFlushTiming) }()

	if qc.Results == nil {
		qc.Results = make(queryCom.AQLQueryResult)
	}

	oopkContext := qc.OOPK
	dpc := qc.resultFlushContext
	dimValues := make([]*string, len(oopkContext.Dimensions))

	var fromOffset, toOffset int
	if qc.fromTime != nil && qc.toTime != nil {
		_, fromOffset = qc.fromTime.Time.Zone()
		_, toOffset = qc.toTime.Time.Zone()
	}

	dimOffsets := make(map[int][2]int)
	for dimIndex := range oopkContext.Dimensions {
		dimVectorIndex := oopkContext.DimensionVectorIndex[dimIndex]
		valueOffset, nullOffset := queryCom.GetDimensionStartOffsets(oopkContext.NumDimsPerDimWidth, dimVectorIndex, oopkContext.ResultSize)
		dimOffsets[dimIndex] = [2]int{valueOffset, nullOffset}
	}

	for i := 0; i < oopkContext.ResultSize; i++ {
		dimReadingStart := utils.Now()
		for dimIndex := range oopkContext.Dimensions {
			offsets := dimOffsets[dimIndex]
			valueOffset, nullOffset := offsets[0], offsets[1]
			valuePtr, nullPtr := utils.MemAccess(oopkContext.dimensionVectorH, valueOffset), utils.MemAccess(oopkContext.dimensionVectorH, nullOffset)

			if qc.Query.Dimensions[dimIndex].IsTimeDimension() && dpc.dimensionValueCache[dimIndex] == nil {
				dpc.dimensionValueCache[dimIndex] = make(map[queryCom.TimeDimensionMeta]map[int64]string)
			}

			var timeDimensionMeta *queryCom.TimeDimensionMeta

			if qc.Query.Dimensions[dimIndex].IsTimeDimension() {
				timeDimensionMeta = &queryCom.TimeDimensionMeta{
					TimeBucketizer:  qc.Query.Dimensions[dimIndex].TimeBucketizer,
					TimeUnit:        qc.Query.Dimensions[dimIndex].TimeUnit,
					IsTimezoneTable: qc.timezoneTable.tableColumn != "",
					TimeZone:        qc.fixedTimezone,
					DSTSwitchTs:     qc.dstswitch,
					FromOffset:      fromOffset,
					ToOffset:        toOffset,
				}
			}

			// don't translate enum if it's for distributed mode (DataOnly == true)
			var enumDict []string
			if !qc.DataOnly {
				enumDict = dpc.reverseDicts[dimIndex]
			}

			dimValues[dimIndex] = queryCom.ReadDimension(
				valuePtr, nullPtr, i, dpc.dimensionDataTypes[dimIndex], enumDict,
				timeDimensionMeta, dpc.dimensionValueCache[dimIndex])
		}
		utils.GetRootReporter().GetTimer(utils.QueryDimReadLatency).Record(utils.Now().Sub(dimReadingStart))

		if qc.IsNonAggregationQuery {
			if qc.ResponseWriter != nil {
				nullStr := queryCom.NULLString
				for i, dimVal := range dimValues {
					if dimVal == nil {
						dimValues[i] = &nullStr
					}
				}
				valuesBytes, _ := json.Marshal(dimValues)
				if qc.resultFlushContext.rowsFlushed > 0 {
					qc.ResponseWriter.Write(bytesComma)
				}
				qc.ResponseWriter.Write(valuesBytes)
				qc.resultFlushContext.rowsFlushed++
			} else {
				qc.Results.Append(dimValues)
			}

		} else {
			measureBytes := oopkContext.MeasureBytes

			// For avg aggregation function, we only need to read first 4 bytes which is the average.
			if qc.OOPK.AggregateType == C.AGGR_AVG_FLOAT {
				measureBytes = 4
			}

			measureValue := readMeasure(
				utils.MemAccess(oopkContext.measureVectorH, i*oopkContext.MeasureBytes), oopkContext.Measure,
				measureBytes)

			qc.Results.Set(dimValues, measureValue)
		}
	}
}