in query/aql_processor.go [163:259]
func (qc *AQLQueryContext) processShard(memStore memstore.MemStore, shardID int, previousBatchExecutor BatchExecutor) BatchExecutor {
var liveRecordsProcessed, archiveRecordsProcessed, liveBatchProcessed, archiveBatchProcessed, liveBytesTransferred, archiveBytesTransferred int
shard, err := memStore.GetTableShard(qc.Query.Table, shardID)
if err != nil {
qc.Error = utils.StackError(err, "failed to get shard %d for table %s",
shardID, qc.Query.Table)
return previousBatchExecutor
}
defer shard.Users.Done()
var archiveStore *memstore.ArchiveStoreVersion
var cutoff uint32
if shard.Schema.Schema.IsFactTable {
archiveStore = shard.ArchiveStore.GetCurrentVersion()
defer archiveStore.Users.Done()
cutoff = archiveStore.ArchivingCutoff
}
// Process live batches.
if qc.toTime == nil || cutoff < uint32(qc.toTime.Time.Unix()) {
batchIDs, numRecordsInLastBatch := shard.LiveStore.GetBatchIDs()
for i, batchID := range batchIDs {
if qc.OOPK.done {
break
}
start := utils.Now()
batch := shard.LiveStore.GetBatchForRead(batchID)
utils.GetReporter(shard.Schema.Schema.Name, shard.ShardID).GetTimer(utils.QueryReadLockAcquireTime).Record(utils.Now().Sub(start))
if batch == nil {
continue
}
// For now, dimension table does not persist min and max therefore
// we can only skip live batch for fact table.
// TODO: Persist min/max/numTrues when snapshotting.
if shard.Schema.Schema.IsFactTable && qc.shouldSkipLiveBatch(batch) {
batch.RUnlock()
qc.OOPK.LiveBatchStats.NumBatchSkipped++
continue
}
liveBatchProcessed++
size := batch.Capacity
if i == len(batchIDs)-1 {
size = numRecordsInLastBatch
}
liveRecordsProcessed += size
previousBatchExecutor = qc.processBatch(
shard.Schema.Schema.Name,
shard.ShardID,
&batch.Batch,
batchID,
size,
qc.transferLiveBatch(batch, size),
qc.liveBatchCustomFilterExecutor(cutoff), previousBatchExecutor, true)
qc.cudaStreams[0], qc.cudaStreams[1] = qc.cudaStreams[1], qc.cudaStreams[0]
liveBytesTransferred += qc.OOPK.currentBatch.stats.bytesTransferred
}
}
// Process archive batches.
if archiveStore != nil && (qc.fromTime == nil || cutoff > uint32(qc.fromTime.Time.Unix())) {
scanner := qc.TableScanners[0]
for batchID := scanner.ArchiveBatchIDStart; batchID < scanner.ArchiveBatchIDEnd; batchID++ {
if qc.OOPK.done {
break
}
archiveBatch := archiveStore.RequestBatch(int32(batchID))
if archiveBatch.Size == 0 {
qc.OOPK.ArchiveBatchStats.NumBatchSkipped++
continue
}
isFirstOrLast := batchID == scanner.ArchiveBatchIDStart || batchID == scanner.ArchiveBatchIDEnd-1
previousBatchExecutor = qc.processBatch(
shard.Schema.Schema.Name,
shard.ShardID,
&archiveBatch.Batch,
int32(batchID),
archiveBatch.Size,
qc.transferArchiveBatch(archiveBatch, isFirstOrLast),
qc.archiveBatchCustomFilterExecutor(isFirstOrLast),
previousBatchExecutor, false)
archiveRecordsProcessed += archiveBatch.Size
archiveBatchProcessed++
qc.cudaStreams[0], qc.cudaStreams[1] = qc.cudaStreams[1], qc.cudaStreams[0]
archiveBytesTransferred += qc.OOPK.currentBatch.stats.bytesTransferred
}
}
utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryLiveRecordsProcessed).Inc(int64(liveRecordsProcessed))
utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryArchiveRecordsProcessed).Inc(int64(archiveRecordsProcessed))
utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryLiveBatchProcessed).Inc(int64(liveBatchProcessed))
utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryArchiveBatchProcessed).Inc(int64(archiveBatchProcessed))
utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryLiveBytesTransferred).Inc(int64(liveBytesTransferred))
utils.GetReporter(qc.Query.Table, shardID).GetCounter(utils.QueryArchiveBytesTransferred).Inc(int64(archiveBytesTransferred))
return previousBatchExecutor
}