in src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java [69:208]
protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey,
final RowIterator indexHits,
final ReadCommand command,
final ReadExecutionController executionController)
{
assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
return new UnfilteredPartitionIterator()
{
private IndexEntry nextEntry;
private UnfilteredRowIterator next;
public TableMetadata metadata()
{
return command.metadata();
}
public boolean hasNext()
{
return prepareNext();
}
public UnfilteredRowIterator next()
{
if (next == null)
prepareNext();
UnfilteredRowIterator toReturn = next;
next = null;
return toReturn;
}
private boolean prepareNext()
{
while (true)
{
if (next != null)
return true;
if (nextEntry == null)
{
if (!indexHits.hasNext())
return false;
nextEntry = index.decodeEntry(indexKey, indexHits.next());
}
SinglePartitionReadCommand dataCmd;
DecoratedKey partitionKey = index.baseCfs.decorateKey(nextEntry.indexedKey);
List<IndexEntry> entries = new ArrayList<>();
if (isStaticColumn())
{
// The index hit may not match the commad key constraint
if (!isMatchingEntry(partitionKey, nextEntry, command)) {
nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
continue;
}
// If the index is on a static column, we just need to do a full read on the partition.
// Note that we want to re-use the command.columnFilter() in case of future change.
dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata(),
command.nowInSec(),
command.columnFilter(),
RowFilter.none(),
DataLimits.NONE,
partitionKey,
command.clusteringIndexFilter(partitionKey));
entries.add(nextEntry);
nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
}
else
{
// Gather all index hits belonging to the same partition and query the data for those hits.
// TODO: it's much more efficient to do 1 read for all hits to the same partition than doing
// 1 read per index hit. However, this basically mean materializing all hits for a partition
// in memory so we should consider adding some paging mechanism. However, index hits should
// be relatively small so it's much better than the previous code that was materializing all
// *data* for a given partition.
BTreeSet.Builder<Clustering<?>> clusterings = BTreeSet.builder(index.baseCfs.getComparator());
while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
{
// We're queried a slice of the index, but some hits may not match some of the clustering column constraints
if (isMatchingEntry(partitionKey, nextEntry, command))
{
clusterings.add(nextEntry.indexedEntryClustering);
entries.add(nextEntry);
}
nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
}
// Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing
if (clusterings.isEmpty())
continue;
// Query the gathered index hits. We still need to filter stale hits from the resulting query.
ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata(),
command.nowInSec(),
command.columnFilter(),
command.rowFilter(),
DataLimits.NONE,
partitionKey,
filter,
(Index.QueryPlan) null);
}
// by the next caller of next, or through closing this iterator is this come before.
UnfilteredRowIterator dataIter =
filterStaleEntries(dataCmd.queryMemtableAndDisk(index.baseCfs, executionController),
indexKey.getKey(),
entries,
executionController.getWriteContext(),
command.nowInSec());
if (dataIter.isEmpty())
{
dataIter.close();
continue;
}
next = dataIter;
return true;
}
}
public void remove()
{
throw new UnsupportedOperationException();
}
public void close()
{
indexHits.close();
if (next != null)
next.close();
}
};
}