func()

in query/aql_processor.go [49:161]


func (qc *AQLQueryContext) ProcessQuery(memStore memstore.MemStore) {
	defer func() {
		if r := recover(); r != nil {
			// find out exactly what the error was and set err
			switch x := r.(type) {
			case string:
				qc.Error = utils.StackError(nil, x)
			case error:
				qc.Error = utils.StackError(x, "Panic happens when processing query")
			default:
				qc.Error = utils.StackError(nil, "Panic happens when processing query %v", x)
			}
			utils.GetLogger().Error("Releasing device memory after panic")
			qc.Release()
		}
	}()

	qc.cudaStreams[0] = cgoutils.CreateCudaStream(qc.Device)
	qc.cudaStreams[1] = cgoutils.CreateCudaStream(qc.Device)
	qc.OOPK.currentBatch.device = qc.Device
	qc.OOPK.LiveBatchStats = oopkQueryStats{
		Name2Stage: make(map[stageName]*oopkStageSummaryStats),
	}
	qc.OOPK.ArchiveBatchStats = oopkQueryStats{
		Name2Stage: make(map[stageName]*oopkStageSummaryStats),
	}

	previousBatchExecutor := NewDummyBatchExecutor()

	start := utils.Now()
	for joinTableID, join := range qc.Query.Joins {
		qc.prepareForeignTable(memStore, joinTableID, join)
		if qc.Error != nil {
			return
		}
	}
	qc.reportTiming(qc.cudaStreams[0], &start, prepareForeignTableTiming)

	qc.prepareTimezoneTable(memStore)
	if qc.Error != nil {
		return
	}

	// prepare geo intersection
	if qc.OOPK.geoIntersection != nil {
		shapeExists := qc.prepareForGeoIntersect(memStore)
		if qc.Error != nil {
			return
		}
		if !shapeExists {
			// if no shape exist and geo check for point in shape
			// no need to continue processing batch
			if qc.OOPK.geoIntersection.inOrOut {
				return
			}
			// if no shape exist and geo check for point not in shape
			// no need to do geo intersection
			qc.OOPK.geoIntersection = nil
		}
	}

	qc.initializeNonAggResponse()

	qc.initResultFlushContext()

	for _, shardID := range qc.TableScanners[0].Shards {
		previousBatchExecutor = qc.processShard(memStore, shardID, previousBatchExecutor)
		if qc.Error != nil {
			return
		}
		if qc.OOPK.done {
			break
		}
	}

	// query execution for last batch.
	qc.runBatchExecutor(previousBatchExecutor, true)

	// this code snippet does the followings:
	// 1. write stats to log.
	// 2. allocate host buffer for result and copy the result from device to host.
	// 3. clean up device status buffers if no panic.
	if qc.Debug {
		qc.OOPK.LiveBatchStats.writeToLog()
		qc.OOPK.ArchiveBatchStats.writeToLog()
	}

	start = utils.Now()
	if qc.Error == nil {
		// Copy the result to host memory.
		qc.OOPK.ResultSize = qc.OOPK.currentBatch.resultSize
		if qc.OOPK.IsHLL() {
			qc.HLLQueryResult, qc.Error = qc.PostprocessAsHLLData()
		} else {
			if !qc.IsNonAggregationQuery {
				// copy dimensions
				qc.OOPK.dimensionVectorH = cgoutils.HostAlloc(qc.OOPK.ResultSize * qc.OOPK.DimRowBytes)
				asyncCopyDimensionVector(qc.OOPK.dimensionVectorH, qc.OOPK.currentBatch.dimensionVectorD[0].getPointer(), qc.OOPK.ResultSize, 0,
					qc.OOPK.NumDimsPerDimWidth, qc.OOPK.ResultSize, qc.OOPK.currentBatch.resultCapacity,
					cgoutils.AsyncCopyDeviceToHost, qc.cudaStreams[0], qc.Device)
				// copy measures
				qc.OOPK.measureVectorH = cgoutils.HostAlloc(qc.OOPK.ResultSize * qc.OOPK.MeasureBytes)
				cgoutils.AsyncCopyDeviceToHost(
					qc.OOPK.measureVectorH, qc.OOPK.currentBatch.measureVectorD[0].getPointer(),
					qc.OOPK.ResultSize*qc.OOPK.MeasureBytes, qc.cudaStreams[0], qc.Device)
				cgoutils.WaitForCudaStream(qc.cudaStreams[0], qc.Device)
			}
		}
	}
	qc.reportTiming(qc.cudaStreams[0], &start, resultTransferTiming)
	qc.cleanUpDeviceStatus()
	qc.reportTiming(nil, &start, finalCleanupTiming)
}