query/aql_processor.go (1,150 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package query import ( "github.com/uber/aresdb/cgoutils" "math" "unsafe" "encoding/binary" "encoding/json" "github.com/uber/aresdb/memstore" memCom "github.com/uber/aresdb/memstore/common" "github.com/uber/aresdb/memstore/list" queryCom "github.com/uber/aresdb/query/common" "github.com/uber/aresdb/query/expr" "github.com/uber/aresdb/utils" "time" ) const ( hllQueryRequiredMemoryInMB = 10 * 1024 ) // batchTransferExecutor defines the type of the functor to transfer a live batch or a archive batch // from host memory to device memory. hostVPs will be the columns to be released after transfer. startRow // is used to slice the vector party. type batchTransferExecutor func(stream unsafe.Pointer) (deviceColumns []deviceVectorPartySlice, hostVPs []memCom.VectorParty, firstColumn, startRow, totalBytes, numTransfers, sizeAfterPrefilter int) // customFilterExecutor is the functor to apply custom filters depends on the batch type. For archive batch, // the custom filter will be the time filter and will only be applied to first or last batch. For live batch, // the custom filters will be the cutoff time filter if cutoff is larger than 0, pre-filters and time filters. type customFilterExecutor func(stream unsafe.Pointer) // ProcessQuery processes the compiled query and executes it on GPU. func (qc *AQLQueryContext) ProcessQuery(memStore memstore.MemStore) { defer func() { if r := recover(); r != nil { // find out exactly what the error was and set err switch x := r.(type) { case string: qc.Error = utils.StackError(nil, x) case error: qc.Error = utils.StackError(x, "Panic happens when processing query") default: qc.Error = utils.StackError(nil, "Panic happens when processing query %v", x) } utils.GetLogger().Error("Releasing device memory after panic") qc.Release() } }() qc.cudaStreams[0] = cgoutils.CreateCudaStream(qc.Device) qc.cudaStreams[1] = cgoutils.CreateCudaStream(qc.Device) qc.OOPK.currentBatch.device = qc.Device qc.OOPK.LiveBatchStats = oopkQueryStats{ Name2Stage: make(map[stageName]*oopkStageSummaryStats), } qc.OOPK.ArchiveBatchStats = oopkQueryStats{ Name2Stage: make(map[stageName]*oopkStageSummaryStats), } previousBatchExecutor := NewDummyBatchExecutor() start := utils.Now() for joinTableID, join := range qc.Query.Joins { qc.prepareForeignTable(memStore, joinTableID, join) if qc.Error != nil { return } } qc.reportTiming(qc.cudaStreams[0], &start, prepareForeignTableTiming) qc.prepareTimezoneTable(memStore) if qc.Error != nil { return } // prepare geo intersection if qc.OOPK.geoIntersection != nil { shapeExists := qc.prepareForGeoIntersect(memStore) if qc.Error != nil { return } if !shapeExists { // if no shape exist and geo check for point in shape // no need to continue processing batch if qc.OOPK.geoIntersection.inOrOut { return } // if no shape exist and geo check for point not in shape // no need to do geo intersection qc.OOPK.geoIntersection = nil } } qc.initializeNonAggResponse() qc.initResultFlushContext() for _, shardID := range qc.TableScanners[0].Shards { previousBatchExecutor = qc.processShard(memStore, shardID, previousBatchExecutor) if qc.Error != nil { return } if qc.OOPK.done { break } } // query execution for last batch. qc.runBatchExecutor(previousBatchExecutor, true) // this code snippet does the followings: // 1. write stats to log. // 2. allocate host buffer for result and copy the result from device to host. // 3. clean up device status buffers if no panic. if qc.Debug { qc.OOPK.LiveBatchStats.writeToLog() qc.OOPK.ArchiveBatchStats.writeToLog() } start = utils.Now() if qc.Error == nil { // Copy the result to host memory. qc.OOPK.ResultSize = qc.OOPK.currentBatch.resultSize if qc.OOPK.IsHLL() { qc.HLLQueryResult, qc.Error = qc.PostprocessAsHLLData() } else { if !qc.IsNonAggregationQuery { // copy dimensions qc.OOPK.dimensionVectorH = cgoutils.HostAlloc(qc.OOPK.ResultSize * qc.OOPK.DimRowBytes) asyncCopyDimensionVector(qc.OOPK.dimensionVectorH, qc.OOPK.currentBatch.dimensionVectorD[0].getPointer(), qc.OOPK.ResultSize, 0, qc.OOPK.NumDimsPerDimWidth, qc.OOPK.ResultSize, qc.OOPK.currentBatch.resultCapacity, cgoutils.AsyncCopyDeviceToHost, qc.cudaStreams[0], qc.Device) // copy measures qc.OOPK.measureVectorH = cgoutils.HostAlloc(qc.OOPK.ResultSize * qc.OOPK.MeasureBytes) cgoutils.AsyncCopyDeviceToHost( qc.OOPK.measureVectorH, qc.OOPK.currentBatch.measureVectorD[0].getPointer(), qc.OOPK.ResultSize*qc.OOPK.MeasureBytes, qc.cudaStreams[0], qc.Device) cgoutils.WaitForCudaStream(qc.cudaStreams[0], qc.Device) } } } qc.reportTiming(qc.cudaStreams[0], &start, resultTransferTiming) qc.cleanUpDeviceStatus() qc.reportTiming(nil, &start, finalCleanupTiming) } func (qc *AQLQueryContext) processShard(memStore memstore.MemStore, shardID int, previousBatchExecutor BatchExecutor) BatchExecutor { var liveRecordsProcessed, archiveRecordsProcessed, liveBatchProcessed, archiveBatchProcessed, liveBytesTransferred, archiveBytesTransferred int shard, err := memStore.GetTableShard(qc.Query.Table, shardID) if err != nil { qc.Error = utils.StackError(err, "failed to get shard %d for table %s", shardID, qc.Query.Table) return previousBatchExecutor } defer shard.Users.Done() var archiveStore *memstore.ArchiveStoreVersion var cutoff uint32 if shard.Schema.Schema.IsFactTable { archiveStore = shard.ArchiveStore.GetCurrentVersion() defer archiveStore.Users.Done() cutoff = archiveStore.ArchivingCutoff } // Process live batches. if qc.toTime == nil || cutoff < uint32(qc.toTime.Time.Unix()) { batchIDs, numRecordsInLastBatch := shard.LiveStore.GetBatchIDs() for i, batchID := range batchIDs { if qc.OOPK.done { break } start := utils.Now() batch := shard.LiveStore.GetBatchForRead(batchID) utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.QueryReadLockAcquireTime).Record(utils.Now().Sub(start)) if batch == nil { continue } // For now, dimension table does not persist min and max therefore // we can only skip live batch for fact table. // TODO: Persist min/max/numTrues when snapshotting. if shard.Schema.Schema.IsFactTable && qc.shouldSkipLiveBatch(batch) { batch.RUnlock() qc.OOPK.LiveBatchStats.NumBatchSkipped++ continue } liveBatchProcessed++ size := batch.Capacity if i == len(batchIDs)-1 { size = numRecordsInLastBatch } liveRecordsProcessed += size previousBatchExecutor = qc.processBatch( shard.Schema.Schema.Name, shard.ShardID, &batch.Batch, batchID, size, qc.transferLiveBatch(batch, size), qc.liveBatchCustomFilterExecutor(cutoff), previousBatchExecutor, true) qc.cudaStreams[0], qc.cudaStreams[1] = qc.cudaStreams[1], qc.cudaStreams[0] liveBytesTransferred += qc.OOPK.currentBatch.stats.bytesTransferred } } // Process archive batches. if archiveStore != nil && (qc.fromTime == nil || cutoff > uint32(qc.fromTime.Time.Unix())) { scanner := qc.TableScanners[0] for batchID := scanner.ArchiveBatchIDStart; batchID < scanner.ArchiveBatchIDEnd; batchID++ { if qc.OOPK.done { break } archiveBatch := archiveStore.RequestBatch(int32(batchID)) if archiveBatch.Size == 0 { qc.OOPK.ArchiveBatchStats.NumBatchSkipped++ continue } isFirstOrLast := batchID == scanner.ArchiveBatchIDStart || batchID == scanner.ArchiveBatchIDEnd-1 previousBatchExecutor = qc.processBatch( shard.Schema.Schema.Name, shard.ShardID, &archiveBatch.Batch, int32(batchID), archiveBatch.Size, qc.transferArchiveBatch(archiveBatch, isFirstOrLast), qc.archiveBatchCustomFilterExecutor(isFirstOrLast), previousBatchExecutor, false) archiveRecordsProcessed += archiveBatch.Size archiveBatchProcessed++ qc.cudaStreams[0], qc.cudaStreams[1] = qc.cudaStreams[1], qc.cudaStreams[0] archiveBytesTransferred += qc.OOPK.currentBatch.stats.bytesTransferred } } utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryLiveRecordsProcessed).Inc(int64(liveRecordsProcessed)) utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryArchiveRecordsProcessed).Inc(int64(archiveRecordsProcessed)) utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryLiveBatchProcessed).Inc(int64(liveBatchProcessed)) utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryArchiveBatchProcessed).Inc(int64(archiveBatchProcessed)) utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryLiveBytesTransferred).Inc(int64(liveBytesTransferred)) utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryArchiveBytesTransferred).Inc(int64(archiveBytesTransferred)) return previousBatchExecutor } // Release releases all device memory it allocated. It **should only called** when any errors happens while the query is // processed. func (qc *AQLQueryContext) Release() { // release device memory for processing current batch. qc.OOPK.currentBatch.cleanupBeforeAggregation() qc.OOPK.currentBatch.swapResultBufferForNextBatch() qc.cleanUpDeviceStatus() qc.ReleaseHostResultsBuffers() } // CleanUpDevice cleans up the device status including // 1. clean up the device buffer for storing results. // 2. clean up the cuda streams func (qc *AQLQueryContext) cleanUpDeviceStatus() { // clean up foreign table memory after query for _, foreignTable := range qc.OOPK.foreignTables { qc.cleanUpForeignTable(foreignTable) } qc.OOPK.foreignTables = nil // release geo pointers if qc.OOPK.geoIntersection != nil { deviceFreeAndSetNil(&qc.OOPK.geoIntersection.shapeLatLongs) } // Destroy streams cgoutils.DestroyCudaStream(qc.cudaStreams[0], qc.Device) cgoutils.DestroyCudaStream(qc.cudaStreams[1], qc.Device) qc.cudaStreams = [2]unsafe.Pointer{nil, nil} // Clean up the device result buffers. qc.OOPK.currentBatch.cleanupDeviceResultBuffers() // Clean up timezone lookup buffer. deviceFreeAndSetNil(&qc.OOPK.currentBatch.timezoneLookupD) } // clean up foreign table func (qc *AQLQueryContext) cleanUpForeignTable(table *foreignTable) { if table != nil { deviceFreeAndSetNil(&table.devicePrimaryKeyPtr) for _, batch := range table.batches { for _, column := range batch { deviceFreeAndSetNil(&column.basePtr) } } table.batches = nil } } // getGeoShapeLatLongSlice format GeoShapeGo into slices of float32 for query purpose // Lats and Longs are stored in the format as [a1,a2,...an,a1,MaxFloat32,b1,bz,...bn] // refer to time_series_aggregate.h for GeoShape struct func getGeoShapeLatLongSlice(shapesLats, shapesLongs []float32, gs memCom.GeoShapeGo) ([]float32, []float32, int) { numPoints := 0 for i, polygon := range gs.Polygons { if len(polygon) > 0 && i > 0 { // write place holder at start of polygon shapesLats = append(shapesLats, math.MaxFloat32) shapesLongs = append(shapesLongs, math.MaxFloat32) // FLT_MAX as placeholder for each polygon numPoints++ } for _, point := range polygon { shapesLats = append(shapesLats, point[0]) shapesLongs = append(shapesLongs, point[1]) numPoints++ } } return shapesLats, shapesLongs, numPoints } func (qc *AQLQueryContext) prepareForGeoIntersect(memStore memstore.MemStore) (shapeExists bool) { tableScanner := qc.TableScanners[qc.OOPK.geoIntersection.shapeTableID] shapeColumnID := qc.OOPK.geoIntersection.shapeColumnID tableName := tableScanner.Schema.Schema.Name // geo table is not sharded shard, err := memStore.GetTableShard(tableName, 0) if err != nil { qc.Error = utils.StackError(err, "Failed to get shard for table %s, shard: %d", tableName, 0) return } defer shard.Users.Done() numPointsPerShape := make([]int32, 0, len(qc.OOPK.geoIntersection.shapeUUIDs)) qc.OOPK.geoIntersection.validShapeUUIDs = make([]string, 0, len(qc.OOPK.geoIntersection.shapeUUIDs)) var shapesLats, shapesLongs []float32 var numPoints, totalNumPoints int for _, uuid := range qc.OOPK.geoIntersection.shapeUUIDs { recordID, found := shard.LiveStore.LookupKey([]string{uuid}) if found { batch := shard.LiveStore.GetBatchForRead(recordID.BatchID) if batch != nil { shapeValue := batch.GetDataValue(int(recordID.Index), shapeColumnID) // compiler should have verified the geo column GeoShape type shapesLats, shapesLongs, numPoints = getGeoShapeLatLongSlice(shapesLats, shapesLongs, *(shapeValue.GoVal.(*memCom.GeoShapeGo))) if numPoints > 0 { totalNumPoints += numPoints numPointsPerShape = append(numPointsPerShape, int32(numPoints)) qc.OOPK.geoIntersection.validShapeUUIDs = append(qc.OOPK.geoIntersection.validShapeUUIDs, uuid) shapeExists = true } batch.RUnlock() } } } if !shapeExists { return } numValidShapes := len(numPointsPerShape) shapeIndexs := make([]uint8, totalNumPoints) pointIndex := 0 for shapeIndex, numPoints := range numPointsPerShape { for i := 0; i < int(numPoints); i++ { shapeIndexs[pointIndex] = uint8(shapeIndex) pointIndex++ } } // allocate memory for lats, longs (float32) and numPoints (int32) device vectors latsPtrD := deviceAllocate(totalNumPoints*4*2+totalNumPoints, qc.Device) longsPtrD := latsPtrD.offset(totalNumPoints * 4) shapeIndexsD := longsPtrD.offset(totalNumPoints * 4) cgoutils.AsyncCopyHostToDevice(latsPtrD.getPointer(), unsafe.Pointer(&shapesLats[0]), totalNumPoints*4, qc.cudaStreams[0], qc.Device) cgoutils.AsyncCopyHostToDevice(longsPtrD.getPointer(), unsafe.Pointer(&shapesLongs[0]), totalNumPoints*4, qc.cudaStreams[0], qc.Device) cgoutils.AsyncCopyHostToDevice(shapeIndexsD.getPointer(), unsafe.Pointer(&shapeIndexs[0]), totalNumPoints, qc.cudaStreams[0], qc.Device) qc.OOPK.geoIntersection.shapeLatLongs = latsPtrD qc.OOPK.geoIntersection.numShapes = numValidShapes qc.OOPK.geoIntersection.totalNumPoints = totalNumPoints return } // prepare foreign table (allocate and transfer memory) before processing func (qc *AQLQueryContext) prepareForeignTable(memStore memstore.MemStore, joinTableID int, join queryCom.Join) { ft := qc.OOPK.foreignTables[joinTableID] if ft == nil { return } // join only support dimension table for now // and dimension table is not shared shard, err := memStore.GetTableShard(join.Table, 0) if err != nil { qc.Error = utils.StackError(err, "Failed to get shard for table %s, shard: %d", join.Table, 0) return } defer shard.Users.Done() // only need live store for dimension table batchIDs, numRecordsInLastBatch := shard.LiveStore.GetBatchIDs() ft.numRecordsInLastBatch = numRecordsInLastBatch deviceBatches := make([][]deviceVectorPartySlice, len(batchIDs)) // transfer primary key hostPrimaryKeyData := shard.LiveStore.PrimaryKey.LockForTransfer() devicePrimaryKeyPtr := deviceAllocate(hostPrimaryKeyData.NumBytes, qc.Device) cgoutils.AsyncCopyHostToDevice(devicePrimaryKeyPtr.getPointer(), hostPrimaryKeyData.Data, hostPrimaryKeyData.NumBytes, qc.cudaStreams[0], qc.Device) cgoutils.WaitForCudaStream(qc.cudaStreams[0], qc.Device) ft.hostPrimaryKeyData = hostPrimaryKeyData ft.devicePrimaryKeyPtr = devicePrimaryKeyPtr shard.LiveStore.PrimaryKey.UnlockAfterTransfer() // allocate device memory for i, batchID := range batchIDs { batch := shard.LiveStore.GetBatchForRead(batchID) if batch == nil { continue } batchIndex := batchID - memstore.BaseBatchID deviceBatches[batchIndex] = make([]deviceVectorPartySlice, len(qc.TableScanners[joinTableID+1].Columns)) size := batch.Capacity if i == len(batchIDs)-1 { size = numRecordsInLastBatch } for i, columnID := range qc.TableScanners[joinTableID+1].Columns { usage := qc.TableScanners[joinTableID+1].ColumnUsages[columnID] if usage&(columnUsedByAllBatches|columnUsedByLiveBatches) != 0 { sourceVP := batch.GetVectorParty(columnID) if sourceVP == nil { continue } hostVPSlice := sourceVP.(memstore.TransferableVectorParty).GetHostVectorPartySlice(0, size) deviceBatches[batchIndex][i] = hostToDeviceColumn(hostVPSlice, qc.Device) copyHostToDevice(hostVPSlice, deviceBatches[batchIndex][i], qc.cudaStreams[0], qc.Device) } } cgoutils.WaitForCudaStream(qc.cudaStreams[0], qc.Device) batch.RUnlock() } ft.batches = deviceBatches } // prepareTimezoneTable func (qc *AQLQueryContext) prepareTimezoneTable(store memstore.MemStore) { if qc.timezoneTable.tableColumn == "" { return } // Timezone table timezoneTableName := utils.GetConfig().Query.TimezoneTable.TableName schema, err := store.GetSchema(timezoneTableName) if err != nil { qc.Error = err return } if schema == nil { qc.Error = utils.StackError(nil, "unknown timezone table %s", timezoneTableName) return } timer := utils.GetRootReporter().GetTimer(utils.TimezoneLookupTableCreationTime) start := utils.Now() defer func() { duration := utils.Now().Sub(start) timer.Record(duration) }() schema.RLock() defer schema.RUnlock() if tzDict, found := schema.EnumDicts[qc.timezoneTable.tableColumn]; found { lookUp := make([]int16, len(tzDict.ReverseDict)) for i := range lookUp { if loc, err := time.LoadLocation(tzDict.ReverseDict[i]); err == nil { _, offset := time.Now().In(loc).Zone() lookUp[i] = int16(offset) } else { qc.Error = utils.StackError(err, "error parsing timezone") return } } sizeInBytes := binary.Size(lookUp) lookupPtr := deviceAllocate(sizeInBytes, qc.Device) cgoutils.AsyncCopyHostToDevice(lookupPtr.getPointer(), unsafe.Pointer(&lookUp[0]), sizeInBytes, qc.cudaStreams[0], qc.Device) qc.OOPK.currentBatch.timezoneLookupD = lookupPtr qc.OOPK.currentBatch.timezoneLookupDSize = len(lookUp) } else { qc.Error = utils.StackError(nil, "unknown timezone column %s", qc.timezoneTable.tableColumn) return } } // transferLiveBatch returns a functor to transfer a live batch to device memory. The size parameter will be either the // size of the batch or num records in last batch. hostColumns will always be empty since we should not release a vector // party of a live batch. Start row will always be zero as well. func (qc *AQLQueryContext) transferLiveBatch(batch *memstore.LiveBatch, size int) batchTransferExecutor { return func(stream unsafe.Pointer) (deviceColumns []deviceVectorPartySlice, hostVPs []memCom.VectorParty, firstColumn, startRow, totalBytes, numTransfers, sizeAfterPrefilter int) { // Allocate column inputs. firstColumn = -1 deviceColumns = make([]deviceVectorPartySlice, len(qc.TableScanners[0].Columns)) for i, columnID := range qc.TableScanners[0].Columns { usage := qc.TableScanners[0].ColumnUsages[columnID] if usage&(columnUsedByAllBatches|columnUsedByLiveBatches) != 0 { if firstColumn < 0 { firstColumn = i } sourceVP := batch.GetVectorParty(columnID) if sourceVP == nil { continue } hostColumn := sourceVP.(memstore.TransferableVectorParty).GetHostVectorPartySlice(0, size) deviceColumns[i] = hostToDeviceColumn(hostColumn, qc.Device) b, t := copyHostToDevice(hostColumn, deviceColumns[i], stream, qc.Device) totalBytes += b numTransfers += t } } sizeAfterPrefilter = size return } } // liveBatchTimeFilterExecutor returns a functor to apply custom time filters to live batch. func (qc *AQLQueryContext) liveBatchCustomFilterExecutor(cutoff uint32) customFilterExecutor { return func(stream unsafe.Pointer) { // cutoff filter evaluation. // only apply to fact table where cutoff > 0 if cutoff > 0 { qc.OOPK.currentBatch.processExpression( qc.createCutoffTimeFilter(cutoff), nil, qc.TableScanners, qc.OOPK.foreignTables, stream, qc.Device, qc.OOPK.currentBatch.filterAction) } // time filter evaluation for _, filter := range qc.OOPK.TimeFilters { if filter != nil { qc.OOPK.currentBatch.processExpression(filter, nil, qc.TableScanners, qc.OOPK.foreignTables, stream, qc.Device, qc.OOPK.currentBatch.filterAction) } } // prefilter evaluation for _, filter := range qc.OOPK.Prefilters { qc.OOPK.currentBatch.processExpression(filter, nil, qc.TableScanners, qc.OOPK.foreignTables, stream, qc.Device, qc.OOPK.currentBatch.filterAction) } } } // transferArchiveBatch returns the functor to transfer an archive batch to device memory. We will need to release // hostColumns after transfer completes. func (qc *AQLQueryContext) transferArchiveBatch(batch *memstore.ArchiveBatch, isFirstOrLast bool) batchTransferExecutor { return func(stream unsafe.Pointer) (deviceSlices []deviceVectorPartySlice, hostVPs []memCom.VectorParty, firstColumn, startRow, totalBytes, numTransfers, sizeAfterPreFilter int) { matchedColumnUsages := columnUsedByAllBatches if isFirstOrLast { matchedColumnUsages |= columnUsedByFirstArchiveBatch | columnUsedByLastArchiveBatch } // Request columns, prefilter-slicing, allocate column inputs. firstColumn = -1 hostVPs = make([]memCom.VectorParty, len(qc.TableScanners[0].Columns)) hostSlices := make([]memCom.HostVectorPartySlice, len(qc.TableScanners[0].Columns)) deviceSlices = make([]deviceVectorPartySlice, len(qc.TableScanners[0].Columns)) endRow := batch.Size prefilterIndex := 0 // Must iterate in reverse order to apply prefilter slicing properly. for i := len(qc.TableScanners[0].Columns) - 1; i >= 0; i-- { columnID := qc.TableScanners[0].Columns[i] usage := qc.TableScanners[0].ColumnUsages[columnID] if usage&matchedColumnUsages != 0 || usage&columnUsedByPrefilter != 0 { // Request/pin column from disk and wait. vp := batch.RequestVectorParty(columnID) vp.WaitForDiskLoad() // prefilter slicing startRow, endRow, hostSlices[i] = qc.prefilterSlice(vp, prefilterIndex, startRow, endRow) prefilterIndex++ if usage&matchedColumnUsages != 0 { hostVPs[i] = vp firstColumn = i deviceSlices[i] = hostToDeviceColumn(hostSlices[i], qc.Device) } else { vp.Release() } } } for i, dstVPSlice := range deviceSlices { columnID := qc.TableScanners[0].Columns[i] usage := qc.TableScanners[0].ColumnUsages[columnID] if usage&matchedColumnUsages != 0 { srcVPSlice := hostSlices[i] b, t := copyHostToDevice(srcVPSlice, dstVPSlice, stream, qc.Device) totalBytes += b numTransfers += t } } sizeAfterPreFilter = endRow - startRow return } } // archiveBatchCustomFilterExecutor returns a functor to apply custom filter to first or last archive batch. func (qc *AQLQueryContext) archiveBatchCustomFilterExecutor(isFirstOrLast bool) customFilterExecutor { return func(stream unsafe.Pointer) { if isFirstOrLast { for _, filter := range qc.OOPK.TimeFilters { if filter != nil { qc.OOPK.currentBatch.processExpression(filter, nil, qc.TableScanners, qc.OOPK.foreignTables, stream, qc.Device, qc.OOPK.currentBatch.filterAction) } } } } } // helper function for copy dimension vector. Returns the total size of dimension vector. func asyncCopyDimensionVector(toDimVector, fromDimVector unsafe.Pointer, length, offset int, numDimsPerDimWidth queryCom.DimCountsPerDimWidth, toVectorCapacity, fromVectorCapacity int, copyFunc cgoutils.AsyncMemCopyFunc, stream unsafe.Pointer, device int) { ptrFrom, ptrTo := fromDimVector, toDimVector numNullVectors := 0 for _, numDims := range numDimsPerDimWidth { numNullVectors += int(numDims) } dimBytes := 1 << uint(len(numDimsPerDimWidth)-1) bytesToCopy := length * dimBytes for _, numDim := range numDimsPerDimWidth { for i := 0; i < int(numDim); i++ { ptrTemp := utils.MemAccess(ptrTo, dimBytes*offset) copyFunc(ptrTemp, ptrFrom, bytesToCopy, stream, device) ptrTo = utils.MemAccess(ptrTo, dimBytes*toVectorCapacity) ptrFrom = utils.MemAccess(ptrFrom, dimBytes*fromVectorCapacity) } dimBytes >>= 1 bytesToCopy = length * dimBytes } // copy null bytes for i := 0; i < numNullVectors; i++ { ptrTemp := utils.MemAccess(ptrTo, offset) copyFunc(ptrTemp, ptrFrom, length, stream, device) ptrTo = utils.MemAccess(ptrTo, toVectorCapacity) ptrFrom = utils.MemAccess(ptrFrom, fromVectorCapacity) } } // cleanupDeviceResultBuffers cleans up result buffers and resets result fields. func (bc *oopkBatchContext) cleanupDeviceResultBuffers() { deviceFreeAndSetNil(&bc.dimensionVectorD[0]) deviceFreeAndSetNil(&bc.dimensionVectorD[1]) // ok to free nil vectors even if it's not allocated. deviceFreeAndSetNil(&bc.dimIndexVectorD[0]) deviceFreeAndSetNil(&bc.dimIndexVectorD[1]) deviceFreeAndSetNil(&bc.hashVectorD[0]) deviceFreeAndSetNil(&bc.hashVectorD[1]) deviceFreeAndSetNil(&bc.measureVectorD[0]) deviceFreeAndSetNil(&bc.measureVectorD[1]) bc.size = 0 bc.resultSize = 0 bc.resultCapacity = 0 } // clean up memory not used in final aggregation (sort, reduce, hll) // before aggregation happen func (bc *oopkBatchContext) cleanupBeforeAggregation() { for _, column := range bc.columns { deviceFreeAndSetNil(&column.basePtr) } bc.columns = nil deviceFreeAndSetNil(&bc.indexVectorD) deviceFreeAndSetNil(&bc.predicateVectorD) deviceFreeAndSetNil(&bc.geoPredicateVectorD) for _, recordIDsVector := range bc.foreignTableRecordIDsD { deviceFreeAndSetNil(&recordIDsVector) } bc.foreignTableRecordIDsD = nil for _, stackFrame := range bc.exprStackD { deviceFreeAndSetNil(&stackFrame[0]) } bc.exprStackD = nil } // swapResultBufferForNextBatch swaps the two // sets of dim/measure/hash vectors to get ready for the next batch. func (bc *oopkBatchContext) swapResultBufferForNextBatch() { bc.size = 0 bc.dimensionVectorD[0], bc.dimensionVectorD[1] = bc.dimensionVectorD[1], bc.dimensionVectorD[0] bc.measureVectorD[0], bc.measureVectorD[1] = bc.measureVectorD[1], bc.measureVectorD[0] bc.hashVectorD[0], bc.hashVectorD[1] = bc.hashVectorD[1], bc.hashVectorD[0] } // prepareForFiltering prepares the input and the index vectors for filtering. func (bc *oopkBatchContext) prepareForFiltering( columns []deviceVectorPartySlice, firstColumn int, startRow int, stream unsafe.Pointer) { bc.columns = columns bc.startRow = startRow if firstColumn >= 0 { bc.size = columns[firstColumn].length // Allocate twice of the size to save number of allocations of temporary index vector. bc.indexVectorD = deviceAllocate(bc.size*4, bc.device) bc.predicateVectorD = deviceAllocate(bc.size, bc.device) bc.baseCountD = columns[firstColumn].counts.offset(columns[firstColumn].countStartIndex * 4) } bc.stats.batchSize = bc.size } // prepareForDimAndMeasureEval ensures that dim/measure vectors have enough // capacity for bc.resultSize+bc.size. func (bc *oopkBatchContext) prepareForDimAndMeasureEval( dimRowBytes int, measureBytes int, numDimsPerDimWidth queryCom.DimCountsPerDimWidth, isHLL bool, useHashReduction bool, stream unsafe.Pointer) { if bc.resultSize+bc.size > bc.resultCapacity { oldCapacity := bc.resultCapacity bc.resultCapacity = bc.resultSize + bc.size // Extra budget for future proofing. bc.resultCapacity += bc.resultCapacity / 8 bc.reallocateResultBuffers(&bc.dimensionVectorD, dimRowBytes, stream, func(to, from unsafe.Pointer) { asyncCopyDimensionVector(to, from, bc.resultSize, 0, numDimsPerDimWidth, bc.resultCapacity, oldCapacity, cgoutils.AsyncCopyDeviceToDevice, stream, bc.device) }) // uint32_t for index value if isHLL || !useHashReduction { bc.reallocateResultBuffers(&bc.dimIndexVectorD, 4, stream, nil) } // uint64_t for hash value // Note: only when aggregate function is hll, we need to reuse vector[0] if isHLL { bc.reallocateResultBuffers(&bc.hashVectorD, 8, stream, func(to, from unsafe.Pointer) { cgoutils.AsyncCopyDeviceToDevice(to, from, bc.resultSize*8, stream, bc.device) }) } else if !useHashReduction { bc.reallocateResultBuffers(&bc.hashVectorD, 8, stream, nil) } bc.reallocateResultBuffers(&bc.measureVectorD, measureBytes, stream, func(to, from unsafe.Pointer) { cgoutils.AsyncCopyDeviceToDevice(to, from, bc.resultSize*measureBytes, stream, bc.device) }) } } // reallocateResultBuffers reallocates the result buffer pair to size // resultCapacity*unitBytes and copies resultSize*unitBytes from input[0] to output[0]. // this function will read and modify the device pointers in buffers func (bc *oopkBatchContext) reallocateResultBuffers( buffers *[2]devicePointer, unitBytes int, stream unsafe.Pointer, copyFunc func(to, from unsafe.Pointer)) { // copy previous pointers first input := [2]devicePointer{buffers[0], buffers[1]} // set buffers to null device pointer buffers[0], buffers[1] = nullDevicePointer, nullDevicePointer // make sure input pointers are cleaned up defer func() { deviceFreeAndSetNil(&input[0]) deviceFreeAndSetNil(&input[1]) }() // reallocate new device buffers buffers[0] = deviceAllocate(bc.resultCapacity*unitBytes, bc.device) buffers[1] = deviceAllocate(bc.resultCapacity*unitBytes, bc.device) if copyFunc != nil { copyFunc(buffers[0].getPointer(), input[0].getPointer()) } return } // doProfile checks the corresponding profileName against query parameter // and do cuda profiling for this action if name matches. func (qc *AQLQueryContext) doProfile(action func(), profileName string, stream unsafe.Pointer) { if qc.Profiling == profileName { // explicit waiting for cuda stream to avoid profiling previous actions. cgoutils.WaitForCudaStream(stream, qc.Device) utils.GetQueryLogger().Infof("Starting cuda profiler for %s", profileName) cgoutils.CudaProfilerStart() defer func() { // explicit waiting for cuda stream to wait for completion of current action. cgoutils.WaitForCudaStream(stream, qc.Device) utils.GetQueryLogger().Infof("Stopping cuda profiler for %s", profileName) cgoutils.CudaProfilerStop() }() } action() } // processBatch allocates device memory and starts async input data // transferring to device memory. It then invokes previousBatchExecutor // asynchronously to process the previous batch. When both async operations // finish, it prepares for the current batch execution and returns it as // a function closure to be invoked later. customFilterExecutor is the executor // to apply custom filters for live batch and archive batch. func (qc *AQLQueryContext) processBatch(table string, shardID int, batch *memCom.Batch, batchID int32, batchSize int, transferFunc batchTransferExecutor, customFilterFunc customFilterExecutor, previousBatchExecutor BatchExecutor, needToUnlockBatch bool) BatchExecutor { defer func() { if needToUnlockBatch { batch.RUnlock() } }() if qc.Debug { // Finish executing previous batch first to avoid timeline overlapping qc.runBatchExecutor(previousBatchExecutor, false) previousBatchExecutor = NewDummyBatchExecutor() } // reset stats. qc.OOPK.currentBatch.stats = oopkBatchStats{ batchID: batchID, timings: make(map[stageName]float64), } // Async transfer. start := utils.Now() stream := qc.cudaStreams[0] deviceSlices, hostVPs, firstColumn, startRow, totalBytes, numTransfers, sizeAfterPreFilter := transferFunc(stream) qc.OOPK.currentBatch.stats.bytesTransferred += totalBytes qc.OOPK.currentBatch.stats.numTransferCalls += numTransfers qc.reportTimingForCurrentBatch(stream, &start, transferTiming) // Async execute the previous batch. executionDone := make(chan struct{ error }, 1) go func() { defer func() { if r := recover(); r != nil { var err error // find out exactly what the error was and set err switch x := r.(type) { case string: err = utils.StackError(nil, x) case error: err = utils.StackError(x, "Panic happens when executing query") default: err = utils.StackError(nil, "Panic happens when executing query %v", x) } executionDone <- struct{ error }{err} } }() qc.runBatchExecutor(previousBatchExecutor, false) executionDone <- struct{ error }{} }() // Wait for data transfer of the current batch. cgoutils.WaitForCudaStream(stream, qc.Device) utils.GetReporter(table, shardID).GetTimer(utils.QueryBatchTransferTime).Record(utils.Now().Sub(start)) for _, vp := range hostVPs { if vp != nil { // only archive vector party will be returned after transfer function vp.(memCom.ArchiveVectorParty).Release() } } if needToUnlockBatch { batch.RUnlock() needToUnlockBatch = false } // Wait for execution of the previous batch. res := <-executionDone if res.error != nil { // column data transfer for current batch is done // need release current batch's column data before panic for _, column := range deviceSlices { deviceFreeAndSetNil(&column.basePtr) } panic(res.error) } if qc.OOPK.done { // if the query is already satisfied in the middle, we can skip next batch and return for _, column := range deviceSlices { deviceFreeAndSetNil(&column.basePtr) } return NewDummyBatchExecutor() } // no prefilter slicing in livebatch, startRow is always 0 qc.OOPK.currentBatch.size = batchSize qc.OOPK.currentBatch.sizeAfterPreFilter = sizeAfterPreFilter qc.OOPK.currentBatch.prepareForFiltering(deviceSlices, firstColumn, startRow, stream) qc.reportTimingForCurrentBatch(stream, &start, prepareForFilteringTiming) return NewBatchExecutor(qc, batchID, customFilterFunc, stream, start) } // prefilterSlice does the following: // 1. binary search for prefilter values following the matched sort column order // 2. record matched index range on these matched sort columns // 3. binary search on unmatched compressed columns for the row number range // 4. index slice on uncompressed columns for the row number range // 5. align/pad all slices to be pushed func (qc *AQLQueryContext) prefilterSlice(vp memCom.ArchiveVectorParty, prefilterIndex, startRow, endRow int) (int, int, memCom.HostVectorPartySlice) { startIndex, endIndex := 0, vp.GetLength() unmatchedColumn := false scanner := qc.TableScanners[0] if prefilterIndex < len(scanner.EqualityPrefilterValues) { // matched equality filter filterValue := scanner.EqualityPrefilterValues[prefilterIndex] startRow, endRow, startIndex, endIndex = vp.SliceByValue(startRow, endRow, unsafe.Pointer(&filterValue)) } else if prefilterIndex == len(scanner.EqualityPrefilterValues) { // matched range filter // lower bound filterValue := scanner.RangePrefilterValues[0] boundaryType := scanner.RangePrefilterBoundaries[0] if boundaryType != noBoundary { lowerStartRow, lowerEndRow, lowerStartIndex, lowerEndIndex := vp.SliceByValue(startRow, endRow, unsafe.Pointer(&filterValue)) if boundaryType == inclusiveBoundary { startRow, startIndex = lowerStartRow, lowerStartIndex } else { startRow, startIndex = lowerEndRow, lowerEndIndex } } else { // treat as unmatchedColumn when there is one range filter missing unmatchedColumn = true } // SliceByValue of upperBound filterValue = scanner.RangePrefilterValues[1] boundaryType = scanner.RangePrefilterBoundaries[1] if boundaryType != noBoundary { upperStartRow, upperEndRow, upperStartIndex, upperEndIndex := vp.SliceByValue(startRow, endRow, unsafe.Pointer(&filterValue)) if boundaryType == inclusiveBoundary { endRow, endIndex = upperEndRow, upperEndIndex } else { endRow, endIndex = upperStartRow, upperStartIndex } } else { // treat as unmatchedColumn when there is one range filter missing unmatchedColumn = true } } else { unmatchedColumn = true } if unmatchedColumn { // unmatched columns, simply slice based on row number range startIndex, endIndex = vp.SliceIndex(startRow, endRow) } return startRow, endRow, vp.(memstore.TransferableVectorParty).GetHostVectorPartySlice(startIndex, endIndex-startIndex) } // calculateMemoryRequirement estimate memory requirement for batch data. func (qc *AQLQueryContext) calculateMemoryRequirement(memStore memstore.MemStore) int { // keep track of max requirement for batch maxBytesRequired := 0 //TODO(jians): hard code hll query memory requirement here for now, //we can track memory usage //based on table, dimensions, duration to do estimation if qc.OOPK.IsHLL() { return hllQueryRequiredMemoryInMB } for _, shardID := range qc.TableScanners[0].Shards { shard, err := memStore.GetTableShard(qc.Query.Table, shardID) if err != nil { qc.Error = utils.StackError(err, "failed to get shard %d for table %s", shardID, qc.Query.Table) return -1 } var archiveStore *memstore.ArchiveStoreVersion var cutoff uint32 if shard.Schema.Schema.IsFactTable { archiveStore = shard.ArchiveStore.GetCurrentVersion() cutoff = archiveStore.ArchivingCutoff } // estimate live batch memory usage if qc.toTime == nil || cutoff < uint32(qc.toTime.Time.Unix()) { batchIDs, _ := shard.LiveStore.GetBatchIDs() // find first non null batch and estimate. for _, batchID := range batchIDs { liveBatch := shard.LiveStore.GetBatchForRead(batchID) if liveBatch != nil { batchBytes := qc.estimateLiveBatchMemoryUsage(liveBatch) liveBatch.RUnlock() if batchBytes > maxBytesRequired { maxBytesRequired = batchBytes } break } } } // estimate archive batch memory usage if archiveStore != nil { if qc.fromTime == nil || cutoff > uint32(qc.fromTime.Time.Unix()) { scanner := qc.TableScanners[0] for batchID := scanner.ArchiveBatchIDStart; batchID < scanner.ArchiveBatchIDEnd; batchID++ { archiveBatch := archiveStore.RequestBatch(int32(batchID)) if archiveBatch == nil || archiveBatch.Size == 0 { continue } isFirstOrLast := batchID == scanner.ArchiveBatchIDStart || batchID == scanner.ArchiveBatchIDEnd-1 batchBytes := qc.estimateArchiveBatchMemoryUsage(archiveBatch, isFirstOrLast) if batchBytes > maxBytesRequired { maxBytesRequired = batchBytes } } } archiveStore.Users.Done() } shard.Users.Done() } maxBytesRequired += qc.calculateForeignTableMemUsage(memStore) return maxBytesRequired } // estimateLiveBatchMemoryUsage estimate the GPU memory usage for live batches func (qc *AQLQueryContext) estimateLiveBatchMemoryUsage(batch *memstore.LiveBatch) int { columnMemUsage := 0 for _, columnID := range qc.TableScanners[0].Columns { sourceVP := batch.GetVectorParty(columnID) if sourceVP == nil { continue } if memCom.IsArrayType(sourceVP.GetDataType()) { vp := sourceVP.(*list.LiveVectorParty) // for array live vp, we need to use offset size + pool size, and remove cap size for device columnMemUsage += int(vp.GetTotalBytes()) - vp.GetLength()*4 } else { columnMemUsage += int(sourceVP.GetBytes()) } } if batch.Capacity > qc.maxBatchSizeAfterPrefilter { qc.maxBatchSizeAfterPrefilter = batch.Capacity } totalBytes := qc.estimateMemUsageForBatch(batch.Capacity, columnMemUsage, batch.Capacity) utils.GetQueryLogger().Debugf("Live batch %+v needs memory: %d", batch, totalBytes) return totalBytes } // estimateArchiveBatchMemoryUsage estimate the GPU memory usage for archive batch func (qc *AQLQueryContext) estimateArchiveBatchMemoryUsage(batch *memstore.ArchiveBatch, isFirstOrLast bool) int { if batch == nil { return 0 } columnMemUsage := 0 var firstColumnSize int startRow, endRow := 0, batch.Size var hostSlice memCom.HostVectorPartySlice matchedColumnUsages := columnUsedByAllBatches if isFirstOrLast { matchedColumnUsages |= columnUsedByFirstArchiveBatch | columnUsedByLastArchiveBatch } prefilterIndex := 0 // max number of rows after pre-filtering. used for non-agg query maxSizeAfterPreFilter := batch.Size for i := len(qc.TableScanners[0].Columns) - 1; i >= 0; i-- { columnID := qc.TableScanners[0].Columns[i] usage := qc.TableScanners[0].ColumnUsages[columnID] // TODO(cdavid): only read metadata when estimate query memory requirement. sourceVP := batch.RequestVectorParty(columnID) sourceVP.WaitForDiskLoad() if usage&matchedColumnUsages != 0 || usage&columnUsedByPrefilter != 0 { startRow, endRow, hostSlice = qc.prefilterSlice(sourceVP, prefilterIndex, startRow, endRow) if endRow-startRow < maxSizeAfterPreFilter { maxSizeAfterPreFilter = endRow - startRow } prefilterIndex++ if usage&matchedColumnUsages != 0 { if memCom.IsArrayType(hostSlice.ValueType) { columnMemUsage += hostSlice.ValueBytes + hostSlice.Length*8 } else { columnMemUsage += hostSlice.ValueBytes + hostSlice.NullBytes + hostSlice.CountBytes } firstColumnSize = hostSlice.Length } } sourceVP.Release() } if maxSizeAfterPreFilter > qc.maxBatchSizeAfterPrefilter { qc.maxBatchSizeAfterPrefilter = maxSizeAfterPreFilter } totalBytes := qc.estimateMemUsageForBatch(firstColumnSize, columnMemUsage, maxSizeAfterPreFilter) utils.GetQueryLogger().Debugf("Archive batch %d needs memory: %d", batch.BatchID, totalBytes) return totalBytes } // estimateMemUsageForBatch calculates memory usage including: // * Index vector // * Predicate vector // * Dimension // * Measurement // * Sort (hash/index) // * Reduce func (qc *AQLQueryContext) estimateMemUsageForBatch(firstColumnSize, columnMemUsage, maxSizeAfterPreFilter int) (memUsage int) { // 1. columnMemUsage memUsageBeforeAgg := columnMemUsage // 2. index vector memory usage (4 bytes each) memUsageBeforeAgg += firstColumnSize * 4 // 3. predicate memory usage (1 byte each) memUsageBeforeAgg += firstColumnSize // 4. record id vector for foreign table (8 bytes each recordID) memUsageBeforeAgg += firstColumnSize * 8 * len(qc.OOPK.foreignTables) // 5. expression eval memory (max scratch space) memUsageBeforeAgg += qc.estimateExpressionEvaluationMemUsage(firstColumnSize) // 6. geoPredicateVector if qc.OOPK.geoIntersection != nil { memUsageBeforeAgg += firstColumnSize * 4 * 2 } // 7. max(memUsageBeforeAgg, sortReduceMemoryUsage) memUsage = memUsageBeforeAgg if !qc.IsNonAggregationQuery { memUsage = int(math.Max(float64(memUsage), float64(estimateSortReduceMemUsage(firstColumnSize)))) } // 8. Dimension vector memory usage (input + output) if qc.IsNonAggregationQuery { memUsage += maxSizeAfterPreFilter * qc.OOPK.DimRowBytes * 2 } else { if qc.OOPK.UseHashReduction() { // For hash reduction, need hash table with int64_t key (8 bytes) and measureBytes value. // The capacity is 2 * size. memUsage += firstColumnSize * (8 + qc.OOPK.MeasureBytes) * 2 } else { // For sort based reduction, need to allocate space for hash vectors (8bytes each) and // dim index vectors (4 bytes each) memUsage += firstColumnSize * (4 + 8) * 2 } memUsage += firstColumnSize * qc.OOPK.DimRowBytes * 2 } // 9. Measure vector memory usage (input + output) memUsage += firstColumnSize * qc.OOPK.MeasureBytes * 2 return } // memory usage duration expression (filter, dimension, measure) evaluation func (qc *AQLQueryContext) estimateExpressionEvaluationMemUsage(inputSize int) (memUsage int) { // filter expression evaluation for _, filter := range qc.OOPK.MainTableCommonFilters { _, maxExpMemUsage := estimateScratchSpaceMemUsage(filter, inputSize, true) utils.GetQueryLogger().Debugf("Filter %+v: maxExpMemUsage=%d", filter, maxExpMemUsage) memUsage = int(math.Max(float64(memUsage), float64(maxExpMemUsage))) } for _, filter := range qc.OOPK.ForeignTableCommonFilters { _, maxExpMemUsage := estimateScratchSpaceMemUsage(filter, inputSize, true) utils.GetQueryLogger().Debugf("Filter %+v: maxExpMemUsage=%d", filter, maxExpMemUsage) memUsage = int(math.Max(float64(memUsage), float64(maxExpMemUsage))) } // dimension expression evaluation for _, dimension := range qc.OOPK.Dimensions { _, maxExpMemUsage := estimateScratchSpaceMemUsage(dimension, inputSize, true) utils.GetQueryLogger().Debugf("Dimension %+v: maxExpMemUsage=%d", dimension, maxExpMemUsage) memUsage = int(math.Max(float64(memUsage), float64(maxExpMemUsage))) } // measure expression evaluation _, maxExpMemUsage := estimateScratchSpaceMemUsage(qc.OOPK.Measure, inputSize, true) utils.GetQueryLogger().Debugf("Measure %+v: maxExpMemUsage=%d", qc.OOPK.Measure, maxExpMemUsage) memUsage = int(math.Max(float64(memUsage), float64(maxExpMemUsage))) return memUsage } // Note: we only calculate Sort memory usage // since sort memory usage is larger than reduce // and we only care about the maximum func estimateSortReduceMemUsage(inputSize int) (memUsage int) { // dimension index vector // 4 byte for uint32 // 2 vectors for input and output memUsage += inputSize * 4 * 2 // hash vector // 8 byte for uint64 hash value // 2 vectors for input and output memUsage += inputSize * 8 * 2 // we sort dim index values as value, and hash value as key memUsage += inputSize * (8 + 4) return } // estimateScratchSpaceMemUsage calculates memory usage for an expression func estimateScratchSpaceMemUsage(exp expr.Expr, firstColumnSize int, isRoot bool) (int, int) { var currentMemUsage int var maxMemUsage int switch e := exp.(type) { case *expr.ParenExpr: return estimateScratchSpaceMemUsage(e.Expr, firstColumnSize, isRoot) case *expr.UnaryExpr: childCurrentMemUsage, childMaxMemUsage := estimateScratchSpaceMemUsage(e.Expr, firstColumnSize, false) if !isRoot { currentMemUsage = firstColumnSize * 5 } maxMemUsage = int(math.Max(float64(childCurrentMemUsage+currentMemUsage), float64(childMaxMemUsage))) return currentMemUsage, maxMemUsage case *expr.BinaryExpr: lhsCurrentMemUsage, lhsMaxMemUsage := estimateScratchSpaceMemUsage(e.LHS, firstColumnSize, false) rhsCurrentMemUsage, rhsMaxMemUsage := estimateScratchSpaceMemUsage(e.RHS, firstColumnSize, false) if !isRoot { currentMemUsage = firstColumnSize * 5 } childrenMaxMemUsage := math.Max(float64(lhsMaxMemUsage), float64(rhsMaxMemUsage)) maxMemUsage = int(math.Max(float64(currentMemUsage+lhsCurrentMemUsage+rhsCurrentMemUsage), float64(childrenMaxMemUsage))) return currentMemUsage, maxMemUsage default: return 0, 0 } } // calculateForeignTableMemUsage returns how much device memory is needed for foreign table func (qc *AQLQueryContext) calculateForeignTableMemUsage(memStore memstore.MemStore) int { var memUsage int for joinTableID, join := range qc.Query.Joins { // join only support dimension table for now // and dimension table is not shared shard, err := memStore.GetTableShard(join.Table, 0) if err != nil { qc.Error = utils.StackError(err, "Failed to get shard for table %s, shard: %d", join.Table, 0) return 0 } // only need live store for dimension table batchIDs, _ := shard.LiveStore.GetBatchIDs() // primary key memUsage += int(shard.LiveStore.PrimaryKey.AllocatedBytes()) // VPs for _, batchID := range batchIDs { batch := shard.LiveStore.GetBatchForRead(batchID) if batch == nil { continue } for _, columnID := range qc.TableScanners[joinTableID+1].Columns { usage := qc.TableScanners[joinTableID+1].ColumnUsages[columnID] if usage&(columnUsedByAllBatches|columnUsedByLiveBatches) != 0 { sourceVP := batch.GetVectorParty(columnID) if sourceVP == nil { continue } memUsage += int(sourceVP.GetBytes()) } } batch.RUnlock() } shard.Users.Done() } return memUsage } // FindDeviceForQuery calls device manager to find a device for the query func (qc *AQLQueryContext) FindDeviceForQuery(memStore memstore.MemStore, preferredDevice int, deviceManager *DeviceManager, timeout int) { memoryRequired := qc.calculateMemoryRequirement(memStore) if qc.Error != nil { return } qc.OOPK.DeviceMemoryRequirement = memoryRequired waitStart := utils.Now() device := deviceManager.FindDevice(qc.Query, memoryRequired, preferredDevice, timeout) if device == -1 { qc.Error = utils.StackError(nil, "Unable to find device to run this query") } qc.OOPK.DurationWaitedForDevice = utils.Now().Sub(waitStart) qc.Device = device } func (qc *AQLQueryContext) runBatchExecutor(e BatchExecutor, isLastBatch bool) { start := utils.Now() e.preExec(isLastBatch, start) e.filter() e.join() e.project() e.reduce() e.postExec(start) } // copyHostToDevice copy vector party slice to device vector party slice func copyHostToDevice(vps memCom.HostVectorPartySlice, deviceVPSlice deviceVectorPartySlice, stream unsafe.Pointer, device int) (bytesCopied, numTransfers int) { if memCom.IsArrayType(vps.ValueType) { // for array data type // copy offset-length cgoutils.AsyncCopyHostToDevice( deviceVPSlice.offsets.getPointer(), vps.Offsets, vps.Length*8, stream, device) bytesCopied += vps.Length * 8 numTransfers++ // copy value cgoutils.AsyncCopyHostToDevice( deviceVPSlice.values.getPointer(), vps.Values, vps.ValueBytes, stream, device) bytesCopied += vps.ValueBytes numTransfers++ return } if vps.ValueBytes > 0 { cgoutils.AsyncCopyHostToDevice( deviceVPSlice.values.getPointer(), vps.Values, vps.ValueBytes, stream, device) bytesCopied += vps.ValueBytes numTransfers++ } if vps.NullBytes > 0 { cgoutils.AsyncCopyHostToDevice( deviceVPSlice.nulls.getPointer(), vps.Nulls, vps.NullBytes, stream, device) bytesCopied += vps.NullBytes numTransfers++ } if vps.CountBytes > 0 { cgoutils.AsyncCopyHostToDevice( deviceVPSlice.counts.getPointer(), vps.Counts, vps.CountBytes, stream, device) bytesCopied += vps.CountBytes numTransfers++ } return } func hostToDeviceColumn(hostColumn memCom.HostVectorPartySlice, device int) deviceVectorPartySlice { if memCom.IsArrayType(hostColumn.ValueType) { deviceColumn := deviceVectorPartySlice{ length: hostColumn.Length, valueType: hostColumn.ValueType, defaultValue: hostColumn.DefaultValue, } OffsetBytes := hostColumn.Length * 8 totalColumnBytes := hostColumn.ValueBytes + OffsetBytes if totalColumnBytes > 0 { deviceColumn.basePtr = deviceAllocate(totalColumnBytes, device) deviceColumn.offsets = deviceColumn.basePtr deviceColumn.values = deviceColumn.basePtr.offset(OffsetBytes) deviceColumn.valueOffsetAdjust = hostColumn.ValueOffsetAdjust } return deviceColumn } // fnon-array type deviceColumn := deviceVectorPartySlice{ length: hostColumn.Length, valueType: hostColumn.ValueType, defaultValue: hostColumn.DefaultValue, valueStartIndex: hostColumn.ValueStartIndex, nullStartIndex: hostColumn.NullStartIndex, countStartIndex: hostColumn.CountStartIndex, } totalColumnBytes := hostColumn.ValueBytes + hostColumn.NullBytes + hostColumn.CountBytes if totalColumnBytes > 0 { deviceColumn.basePtr = deviceAllocate(totalColumnBytes, device) if hostColumn.Counts != nil { deviceColumn.counts = deviceColumn.basePtr.offset(0) } if hostColumn.Nulls != nil { deviceColumn.nulls = deviceColumn.basePtr.offset(hostColumn.CountBytes) } deviceColumn.values = deviceColumn.basePtr.offset( hostColumn.NullBytes + hostColumn.CountBytes) } return deviceColumn } // shouldSkipLiveBatch will determine whether we can skip processing a live batch by checking time filter and // eligible main table common filters. The batch must be non nil. func (qc *AQLQueryContext) shouldSkipLiveBatch(b *memstore.LiveBatch) bool { candidatesFilters := []expr.Expr{qc.OOPK.TimeFilters[0], qc.OOPK.TimeFilters[1]} candidatesFilters = append(candidatesFilters, qc.OOPK.MainTableCommonFilters...) for _, filter := range candidatesFilters { if shouldSkipLiveBatchWithFilter(b, filter) { return true } } return false } // shouldSkipLiveBatchWithFilter will check max and min for the corresponding column against the filter express and // determines whether we should skip processing this live batch. // Following constraints apply: // 1. Filter must be on main table. // 2. Filter must be a binary expression. // 3. OPs must be one of (EQ, GTE,GE,LTE,LE). // 4. One side of the expr must be VarRef // 5. Another side of the xpr must be NumericalLiteral // 6. ColumnType must be UInt32 func shouldSkipLiveBatchWithFilter(b *memstore.LiveBatch, filter expr.Expr) bool { if filter == nil { return false } if binExpr, ok := filter.(*expr.BinaryExpr); ok { var columnExpr *expr.VarRef var numExpr *expr.NumberLiteral op := binExpr.Op switch op { case expr.GTE, expr.GT, expr.LT, expr.LTE, expr.EQ: default: return false } // First try lhs VarRef, rhs Num. lhsVarRef, lhsOK := binExpr.LHS.(*expr.VarRef) rhsNum, rhsOK := binExpr.RHS.(*expr.NumberLiteral) if lhsOK && rhsOK { columnExpr = lhsVarRef numExpr = rhsNum } else { // Then try rhs VarRef, lhs Num. lhsNum, lhsOK := binExpr.LHS.(*expr.NumberLiteral) rhsVarRef, rhsOK := binExpr.RHS.(*expr.VarRef) if lhsOK && rhsOK { // Swap column to the left and number to right. columnExpr = rhsVarRef numExpr = lhsNum // Invert the OP. switch op { case expr.GTE: op = expr.LTE case expr.GT: op = expr.LT case expr.LTE: op = expr.GTE case expr.LT: op = expr.GT } } } if columnExpr != nil && numExpr != nil { // Time filters and main table filters are guaranteed to be on main table. vp := b.GetVectorParty(columnExpr.ColumnID) if vp == nil { return true } if columnExpr.DataType != memCom.Uint32 { return false } num := int64(numExpr.Int) minUint32, maxUint32 := vp.(memCom.LiveVectorParty).GetMinMaxValue() min, max := int64(minUint32), int64(maxUint32) switch op { case expr.GTE: return max < num case expr.GT: return max <= num case expr.LTE: return min > num case expr.LT: return min >= num case expr.EQ: return min > num || max < num } } } return false } func (qc *AQLQueryContext) initializeNonAggResponse() { if qc.IsNonAggregationQuery { headers := make([]string, len(qc.Query.Dimensions)) for i, dim := range qc.Query.Dimensions { headers[i] = dim.Expr } if qc.ResponseWriter != nil { if !qc.DataOnly { headersBytes, _ := json.Marshal(headers) qc.ResponseWriter.Write([]byte(`{"results":[{"headers":`)) qc.ResponseWriter.Write(headersBytes) qc.ResponseWriter.Write([]byte(`,"matrixData":[`)) } } else { // non eager flush qc.Results = make(queryCom.AQLQueryResult) qc.Results.SetHeaders(headers) } } }