func()

in query/aql_processor.go [163:259]


func (qc *AQLQueryContext) processShard(memStore memstore.MemStore, shardID int, previousBatchExecutor BatchExecutor) BatchExecutor {
	var liveRecordsProcessed, archiveRecordsProcessed, liveBatchProcessed, archiveBatchProcessed, liveBytesTransferred, archiveBytesTransferred int
	shard, err := memStore.GetTableShard(qc.Query.Table, shardID)
	if err != nil {
		qc.Error = utils.StackError(err, "failed to get shard %d for table %s",
			shardID, qc.Query.Table)
		return previousBatchExecutor
	}
	defer shard.Users.Done()

	var archiveStore *memstore.ArchiveStoreVersion
	var cutoff uint32
	if shard.Schema.Schema.IsFactTable {
		archiveStore = shard.ArchiveStore.GetCurrentVersion()
		defer archiveStore.Users.Done()
		cutoff = archiveStore.ArchivingCutoff
	}

	// Process live batches.
	if qc.toTime == nil || cutoff < uint32(qc.toTime.Time.Unix()) {
		batchIDs, numRecordsInLastBatch := shard.LiveStore.GetBatchIDs()
		for i, batchID := range batchIDs {
			if qc.OOPK.done {
				break
			}
			start := utils.Now()
			batch := shard.LiveStore.GetBatchForRead(batchID)
			utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.QueryReadLockAcquireTime).Record(utils.Now().Sub(start))
			if batch == nil {
				continue
			}

			// For now, dimension table does not persist min and max therefore
			// we can only skip live batch for fact table.
			// TODO: Persist min/max/numTrues when snapshotting.
			if shard.Schema.Schema.IsFactTable && qc.shouldSkipLiveBatch(batch) {
				batch.RUnlock()
				qc.OOPK.LiveBatchStats.NumBatchSkipped++
				continue
			}

			liveBatchProcessed++
			size := batch.Capacity
			if i == len(batchIDs)-1 {
				size = numRecordsInLastBatch
			}
			liveRecordsProcessed += size
			previousBatchExecutor = qc.processBatch(
				shard.Schema.Schema.Name,
				shard.ShardID,
				&batch.Batch,
				batchID,
				size,
				qc.transferLiveBatch(batch, size),
				qc.liveBatchCustomFilterExecutor(cutoff), previousBatchExecutor, true)
			qc.cudaStreams[0], qc.cudaStreams[1] = qc.cudaStreams[1], qc.cudaStreams[0]
			liveBytesTransferred += qc.OOPK.currentBatch.stats.bytesTransferred
		}
	}

	// Process archive batches.
	if archiveStore != nil && (qc.fromTime == nil || cutoff > uint32(qc.fromTime.Time.Unix())) {
		scanner := qc.TableScanners[0]
		for batchID := scanner.ArchiveBatchIDStart; batchID < scanner.ArchiveBatchIDEnd; batchID++ {
			if qc.OOPK.done {
				break
			}
			archiveBatch := archiveStore.RequestBatch(int32(batchID))
			if archiveBatch.Size == 0 {
				qc.OOPK.ArchiveBatchStats.NumBatchSkipped++
				continue
			}
			isFirstOrLast := batchID == scanner.ArchiveBatchIDStart || batchID == scanner.ArchiveBatchIDEnd-1
			previousBatchExecutor = qc.processBatch(
				shard.Schema.Schema.Name,
				shard.ShardID,
				&archiveBatch.Batch,
				int32(batchID),
				archiveBatch.Size,
				qc.transferArchiveBatch(archiveBatch, isFirstOrLast),
				qc.archiveBatchCustomFilterExecutor(isFirstOrLast),
				previousBatchExecutor, false)
			archiveRecordsProcessed += archiveBatch.Size
			archiveBatchProcessed++
			qc.cudaStreams[0], qc.cudaStreams[1] = qc.cudaStreams[1], qc.cudaStreams[0]
			archiveBytesTransferred += qc.OOPK.currentBatch.stats.bytesTransferred
		}
	}
	utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryLiveRecordsProcessed).Inc(int64(liveRecordsProcessed))
	utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryArchiveRecordsProcessed).Inc(int64(archiveRecordsProcessed))
	utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryLiveBatchProcessed).Inc(int64(liveBatchProcessed))
	utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryArchiveBatchProcessed).Inc(int64(archiveBatchProcessed))
	utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryLiveBytesTransferred).Inc(int64(liveBytesTransferred))
	utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryArchiveBytesTransferred).Inc(int64(archiveBytesTransferred))

	return previousBatchExecutor
}