func()

in query/aql_processor.go [985:1053]


func (qc *AQLQueryContext) calculateMemoryRequirement(memStore memstore.MemStore) int {
	// keep track of max requirement for batch
	maxBytesRequired := 0

	//TODO(jians): hard code hll query memory requirement here for now,
	//we can track memory usage
	//based on table, dimensions, duration to do estimation
	if qc.OOPK.IsHLL() {
		return hllQueryRequiredMemoryInMB
	}

	for _, shardID := range qc.TableScanners[0].Shards {
		shard, err := memStore.GetTableShard(qc.Query.Table, shardID)
		if err != nil {
			qc.Error = utils.StackError(err, "failed to get shard %d for table %s",
				shardID, qc.Query.Table)
			return -1
		}

		var archiveStore *memstore.ArchiveStoreVersion
		var cutoff uint32
		if shard.Schema.Schema.IsFactTable {
			archiveStore = shard.ArchiveStore.GetCurrentVersion()
			cutoff = archiveStore.ArchivingCutoff
		}

		// estimate live batch memory usage
		if qc.toTime == nil || cutoff < uint32(qc.toTime.Time.Unix()) {
			batchIDs, _ := shard.LiveStore.GetBatchIDs()

			// find first non null batch and estimate.
			for _, batchID := range batchIDs {
				liveBatch := shard.LiveStore.GetBatchForRead(batchID)
				if liveBatch != nil {
					batchBytes := qc.estimateLiveBatchMemoryUsage(liveBatch)
					liveBatch.RUnlock()

					if batchBytes > maxBytesRequired {
						maxBytesRequired = batchBytes
					}
					break
				}
			}
		}

		// estimate archive batch memory usage
		if archiveStore != nil {
			if qc.fromTime == nil || cutoff > uint32(qc.fromTime.Time.Unix()) {
				scanner := qc.TableScanners[0]
				for batchID := scanner.ArchiveBatchIDStart; batchID < scanner.ArchiveBatchIDEnd; batchID++ {
					archiveBatch := archiveStore.RequestBatch(int32(batchID))
					if archiveBatch == nil || archiveBatch.Size == 0 {
						continue
					}
					isFirstOrLast := batchID == scanner.ArchiveBatchIDStart || batchID == scanner.ArchiveBatchIDEnd-1
					batchBytes := qc.estimateArchiveBatchMemoryUsage(archiveBatch, isFirstOrLast)
					if batchBytes > maxBytesRequired {
						maxBytesRequired = batchBytes
					}
				}
			}
			archiveStore.Users.Done()
		}
		shard.Users.Done()
	}

	maxBytesRequired += qc.calculateForeignTableMemUsage(memStore)
	return maxBytesRequired
}