in query/aql_processor.go [49:161]
func (qc *AQLQueryContext) ProcessQuery(memStore memstore.MemStore) {
defer func() {
if r := recover(); r != nil {
// find out exactly what the error was and set err
switch x := r.(type) {
case string:
qc.Error = utils.StackError(nil, x)
case error:
qc.Error = utils.StackError(x, "Panic happens when processing query")
default:
qc.Error = utils.StackError(nil, "Panic happens when processing query %v", x)
}
utils.GetLogger().Error("Releasing device memory after panic")
qc.Release()
}
}()
qc.cudaStreams[0] = cgoutils.CreateCudaStream(qc.Device)
qc.cudaStreams[1] = cgoutils.CreateCudaStream(qc.Device)
qc.OOPK.currentBatch.device = qc.Device
qc.OOPK.LiveBatchStats = oopkQueryStats{
Name2Stage: make(map[stageName]*oopkStageSummaryStats),
}
qc.OOPK.ArchiveBatchStats = oopkQueryStats{
Name2Stage: make(map[stageName]*oopkStageSummaryStats),
}
previousBatchExecutor := NewDummyBatchExecutor()
start := utils.Now()
for joinTableID, join := range qc.Query.Joins {
qc.prepareForeignTable(memStore, joinTableID, join)
if qc.Error != nil {
return
}
}
qc.reportTiming(qc.cudaStreams[0], &start, prepareForeignTableTiming)
qc.prepareTimezoneTable(memStore)
if qc.Error != nil {
return
}
// prepare geo intersection
if qc.OOPK.geoIntersection != nil {
shapeExists := qc.prepareForGeoIntersect(memStore)
if qc.Error != nil {
return
}
if !shapeExists {
// if no shape exist and geo check for point in shape
// no need to continue processing batch
if qc.OOPK.geoIntersection.inOrOut {
return
}
// if no shape exist and geo check for point not in shape
// no need to do geo intersection
qc.OOPK.geoIntersection = nil
}
}
qc.initializeNonAggResponse()
qc.initResultFlushContext()
for _, shardID := range qc.TableScanners[0].Shards {
previousBatchExecutor = qc.processShard(memStore, shardID, previousBatchExecutor)
if qc.Error != nil {
return
}
if qc.OOPK.done {
break
}
}
// query execution for last batch.
qc.runBatchExecutor(previousBatchExecutor, true)
// this code snippet does the followings:
// 1. write stats to log.
// 2. allocate host buffer for result and copy the result from device to host.
// 3. clean up device status buffers if no panic.
if qc.Debug {
qc.OOPK.LiveBatchStats.writeToLog()
qc.OOPK.ArchiveBatchStats.writeToLog()
}
start = utils.Now()
if qc.Error == nil {
// Copy the result to host memory.
qc.OOPK.ResultSize = qc.OOPK.currentBatch.resultSize
if qc.OOPK.IsHLL() {
qc.HLLQueryResult, qc.Error = qc.PostprocessAsHLLData()
} else {
if !qc.IsNonAggregationQuery {
// copy dimensions
qc.OOPK.dimensionVectorH = cgoutils.HostAlloc(qc.OOPK.ResultSize * qc.OOPK.DimRowBytes)
asyncCopyDimensionVector(qc.OOPK.dimensionVectorH, qc.OOPK.currentBatch.dimensionVectorD[0].getPointer(), qc.OOPK.ResultSize, 0,
qc.OOPK.NumDimsPerDimWidth, qc.OOPK.ResultSize, qc.OOPK.currentBatch.resultCapacity,
cgoutils.AsyncCopyDeviceToHost, qc.cudaStreams[0], qc.Device)
// copy measures
qc.OOPK.measureVectorH = cgoutils.HostAlloc(qc.OOPK.ResultSize * qc.OOPK.MeasureBytes)
cgoutils.AsyncCopyDeviceToHost(
qc.OOPK.measureVectorH, qc.OOPK.currentBatch.measureVectorD[0].getPointer(),
qc.OOPK.ResultSize*qc.OOPK.MeasureBytes, qc.cudaStreams[0], qc.Device)
cgoutils.WaitForCudaStream(qc.cudaStreams[0], qc.Device)
}
}
}
qc.reportTiming(qc.cudaStreams[0], &start, resultTransferTiming)
qc.cleanUpDeviceStatus()
qc.reportTiming(nil, &start, finalCleanupTiming)
}