in src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java [727:891]
private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, ReadExecutionController controller)
{
/*
* We have 2 main strategies:
* 1) We query memtables and sstables simulateneously. This is our most generic strategy and the one we use
* unless we have a names filter that we know we can optimize futher.
* 2) If we have a name filter (so we query specific rows), we can make a bet: that all column for all queried row
* will have data in the most recent sstable(s), thus saving us from reading older ones. This does imply we
* have a way to guarantee we have all the data for what is queried, which is only possible for name queries
* and if we have neither non-frozen collections/UDTs nor counters.
* If a non-frozen collection or UDT is queried we can't guarantee that an older sstable won't have some
* elements that weren't in the most recent sstables.
* Counters are intrinsically a collection of shards and so have the same problem.
* Counter tables are also special in the sense that their rows do not have primary key liveness
* as INSERT statements are not supported on counter tables. Due to that even if only the primary key
* columns where queried, querying SSTables in timestamp order will always be less efficient for counter tables.
* Also, if tracking repaired data then we skip this optimization so we can collate the repaired sstables
* and generate a digest over their merge, which procludes an early return.
*/
if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter
&& !metadata().isCounter()
&& !queriesMulticellType()
&& !controller.isTrackingRepairedStatus())
{
return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter(), controller);
}
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
view.sstables.sort(SSTableReader.maxTimestampDescending);
ClusteringIndexFilter filter = clusteringIndexFilter();
long minTimestamp = Long.MAX_VALUE;
long mostRecentPartitionTombstone = Long.MIN_VALUE;
InputCollector<UnfilteredRowIterator> inputCollector = iteratorsForPartition(view, controller);
try
{
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
for (Memtable memtable : view.memtables)
{
UnfilteredRowIterator iter = memtable.rowIterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), metricsCollector);
if (iter == null)
continue;
if (memtable.getMinTimestamp() != Memtable.NO_MIN_TIMESTAMP)
minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
// Memtable data is always considered unrepaired
controller.updateMinOldestUnrepairedTombstone(memtable.getMinLocalDeletionTime());
inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false));
mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
iter.partitionLevelDeletion().markedForDeleteAt());
}
/*
* We can't eliminate full sstables based on the timestamp of what we've already read like
* in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
* we've read. We still rely on the sstable ordering by maxTimestamp since if
* maxTimestamp_s1 < maxTimestamp_s0,
* we're guaranteed that s1 cannot have a row tombstone such that
* timestamp(tombstone) > maxTimestamp_s0
* since we necessarily have
* timestamp(tombstone) <= maxTimestamp_s1
* In other words, iterating in descending maxTimestamp order allow to do our mostRecentPartitionTombstone
* elimination in one pass, and minimize the number of sstables for which we read a partition tombstone.
*/
view.sstables.sort(SSTableReader.maxTimestampDescending);
int nonIntersectingSSTables = 0;
int includedDueToTombstones = 0;
if (controller.isTrackingRepairedStatus())
Tracing.trace("Collecting data from sstables and tracking repaired status");
for (SSTableReader sstable : view.sstables)
{
// if we've already seen a partition tombstone with a timestamp greater
// than the most recent update to this sstable, we can skip it
// if we're tracking repaired status, we mark the repaired digest inconclusive
// as other replicas may not have seen this partition delete and so could include
// data from this sstable (or others) in their digests
if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
{
inputCollector.markInconclusive();
break;
}
boolean intersects = intersects(sstable);
boolean hasRequiredStatics = hasRequiredStatics(sstable);
boolean hasPartitionLevelDeletions = hasPartitionLevelDeletions(sstable);
if (!intersects && !hasRequiredStatics && !hasPartitionLevelDeletions)
{
nonIntersectingSSTables++;
continue;
}
if (intersects || hasRequiredStatics)
{
if (!sstable.isRepaired())
controller.updateMinOldestUnrepairedTombstone(sstable.getMinLocalDeletionTime());
// 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = intersects ? makeRowIteratorWithLowerBound(cfs, sstable, metricsCollector)
: makeRowIteratorWithSkippedNonStaticContent(cfs, sstable, metricsCollector);
inputCollector.addSSTableIterator(sstable, iter);
mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
iter.partitionLevelDeletion().markedForDeleteAt());
}
else
{
nonIntersectingSSTables++;
// if the sstable contained range or cell tombstones, it would intersect; since we are here, it means
// that there are no cell or range tombstones we are interested in (due to the filter)
// however, we know that there are partition level deletions in this sstable and we need to make
// an iterator figure out that (see `StatsMetadata.hasPartitionLevelDeletions`)
// 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = makeRowIteratorWithSkippedNonStaticContent(cfs, sstable, metricsCollector);
// if the sstable contains a partition delete, then we must include it regardless of whether it
// shadows any other data seen locally as we can't guarantee that other replicas have seen it
if (!iter.partitionLevelDeletion().isLive())
{
if (!sstable.isRepaired())
controller.updateMinOldestUnrepairedTombstone(sstable.getMinLocalDeletionTime());
inputCollector.addSSTableIterator(sstable, iter);
includedDueToTombstones++;
mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
iter.partitionLevelDeletion().markedForDeleteAt());
}
else
{
iter.close();
}
}
}
if (Tracing.isTracing())
Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
if (inputCollector.isEmpty())
return EmptyIterators.unfilteredRow(cfs.metadata(), partitionKey(), filter.isReversed());
StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());
List<UnfilteredRowIterator> iterators = inputCollector.finalizeIterators(cfs, nowInSec(), controller.oldestUnrepairedTombstone());
return withSSTablesIterated(iterators, cfs.metric, metricsCollector);
}
catch (RuntimeException | Error e)
{
try
{
inputCollector.close();
}
catch (Exception e1)
{
e.addSuppressed(e1);
}
throw e;
}
}