protected UnfilteredPartitionIterator queryDataFromIndex()

in src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java [69:208]


    protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey,
                                                             final RowIterator indexHits,
                                                             final ReadCommand command,
                                                             final ReadExecutionController executionController)
    {
        assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;

        return new UnfilteredPartitionIterator()
        {
            private IndexEntry nextEntry;

            private UnfilteredRowIterator next;

            public TableMetadata metadata()
            {
                return command.metadata();
            }

            public boolean hasNext()
            {
                return prepareNext();
            }

            public UnfilteredRowIterator next()
            {
                if (next == null)
                    prepareNext();

                UnfilteredRowIterator toReturn = next;
                next = null;
                return toReturn;
            }

            private boolean prepareNext()
            {
                while (true)
                {
                    if (next != null)
                        return true;

                    if (nextEntry == null)
                    {
                        if (!indexHits.hasNext())
                            return false;

                        nextEntry = index.decodeEntry(indexKey, indexHits.next());
                    }

                    SinglePartitionReadCommand dataCmd;
                    DecoratedKey partitionKey = index.baseCfs.decorateKey(nextEntry.indexedKey);
                    List<IndexEntry> entries = new ArrayList<>();
                    if (isStaticColumn())
                    {
                        // The index hit may not match the commad key constraint
                        if (!isMatchingEntry(partitionKey, nextEntry, command)) {
                            nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
                            continue;
                        }

                        // If the index is on a static column, we just need to do a full read on the partition.
                        // Note that we want to re-use the command.columnFilter() in case of future change.
                        dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata(),
                                                                    command.nowInSec(),
                                                                    command.columnFilter(),
                                                                    RowFilter.none(),
                                                                    DataLimits.NONE,
                                                                    partitionKey,
                                                                    command.clusteringIndexFilter(partitionKey));
                        entries.add(nextEntry);
                        nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
                    }
                    else
                    {
                        // Gather all index hits belonging to the same partition and query the data for those hits.
                        // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing
                        // 1 read per index hit. However, this basically mean materializing all hits for a partition
                        // in memory so we should consider adding some paging mechanism. However, index hits should
                        // be relatively small so it's much better than the previous code that was materializing all
                        // *data* for a given partition.
                        BTreeSet.Builder<Clustering<?>> clusterings = BTreeSet.builder(index.baseCfs.getComparator());
                        while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
                        {
                            // We're queried a slice of the index, but some hits may not match some of the clustering column constraints
                            if (isMatchingEntry(partitionKey, nextEntry, command))
                            {
                                clusterings.add(nextEntry.indexedEntryClustering);
                                entries.add(nextEntry);
                            }

                            nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
                        }

                        // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing
                        if (clusterings.isEmpty())
                            continue;

                        // Query the gathered index hits. We still need to filter stale hits from the resulting query.
                        ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
                        dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata(),
                                                                    command.nowInSec(),
                                                                    command.columnFilter(),
                                                                    command.rowFilter(),
                                                                    DataLimits.NONE,
                                                                    partitionKey,
                                                                    filter,
                                                                    (Index.QueryPlan) null);
                    }

                    // by the next caller of next, or through closing this iterator is this come before.
                    UnfilteredRowIterator dataIter =
                        filterStaleEntries(dataCmd.queryMemtableAndDisk(index.baseCfs, executionController),
                                           indexKey.getKey(),
                                           entries,
                                           executionController.getWriteContext(),
                                           command.nowInSec());

                    if (dataIter.isEmpty())
                    {
                        dataIter.close();
                        continue;
                    }

                    next = dataIter;
                    return true;
                }
            }

            public void remove()
            {
                throw new UnsupportedOperationException();
            }

            public void close()
            {
                indexHits.close();
                if (next != null)
                    next.close();
            }
        };
    }