public Indexer indexerFor()

in src/java/org/apache/cassandra/index/internal/CassandraIndex.java [340:496]


    public Indexer indexerFor(final DecoratedKey key,
                              final RegularAndStaticColumns columns,
                              final long nowInSec,
                              final WriteContext ctx,
                              final IndexTransaction.Type transactionType,
                              Memtable memtable)
    {
        /*
         * Indexes on regular and static columns (the non primary-key ones) only care about updates with live
         * data for the column they index. In particular, they don't care about having just row or range deletions
         * as they don't know how to update the index table unless they know exactly the value that is deleted.
         *
         * Note that in practice this means that those indexes are only purged of stale entries on compaction,
         * when we resolve both the deletion and the prior data it deletes. Of course, such stale entries are also
         * filtered on read.
         */
        if (!isPrimaryKeyIndex() && !columns.contains(indexedColumn))
            return null;

        return new Indexer()
        {
            public void begin()
            {
            }

            public void partitionDelete(DeletionTime deletionTime)
            {
            }

            public void rangeTombstone(RangeTombstone tombstone)
            {
            }

            public void insertRow(Row row)
            {
                if (row.isStatic() && !indexedColumn.isStatic() && !indexedColumn.isPartitionKey())
                    return;

                if (isPrimaryKeyIndex())
                {
                    indexPrimaryKey(row.clustering(),
                                    getPrimaryKeyIndexLiveness(row),
                                    row.deletion());
                }
                else
                {
                    if (indexedColumn.isComplex())
                        indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
                    else
                        indexCell(row.clustering(), row.getCell(indexedColumn));
                }
            }

            public void removeRow(Row row)
            {
                if (isPrimaryKeyIndex())
                    return;

                if (indexedColumn.isComplex())
                    removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
                else
                    removeCell(row.clustering(), row.getCell(indexedColumn));
            }

            public void updateRow(Row oldRow, Row newRow)
            {
                assert oldRow.isStatic() == newRow.isStatic();
                if (newRow.isStatic() != indexedColumn.isStatic())
                    return;

                if (isPrimaryKeyIndex())
                    indexPrimaryKey(newRow.clustering(),
                                    getPrimaryKeyIndexLiveness(newRow),
                                    newRow.deletion());

                if (indexedColumn.isComplex())
                {
                    indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
                    removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
                }
                else
                {
                    indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
                    removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
                }
            }

            public void finish()
            {
            }

            private void indexCells(Clustering<?> clustering, Iterable<Cell<?>> cells)
            {
                if (cells == null)
                    return;

                for (Cell<?> cell : cells)
                    indexCell(clustering, cell);
            }

            private void indexCell(Clustering<?> clustering, Cell<?> cell)
            {
                if (cell == null || !cell.isLive(nowInSec))
                    return;

                insert(key.getKey(),
                       clustering,
                       cell,
                       LivenessInfo.withExpirationTime(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
                       ctx);
            }

            private void removeCells(Clustering<?> clustering, Iterable<Cell<?>> cells)
            {
                if (cells == null)
                    return;

                for (Cell<?> cell : cells)
                    removeCell(clustering, cell);
            }

            private void removeCell(Clustering<?> clustering, Cell<?> cell)
            {
                if (cell == null || !cell.isLive(nowInSec))
                    return;

                delete(key.getKey(), clustering, cell, ctx, nowInSec);
            }

            private void indexPrimaryKey(final Clustering<?> clustering,
                                         final LivenessInfo liveness,
                                         final Row.Deletion deletion)
            {
                if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
                    insert(key.getKey(), clustering, null, liveness, ctx);

                if (!deletion.isLive())
                    delete(key.getKey(), clustering, deletion.time(), ctx);
            }

            private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
            {
                long timestamp = row.primaryKeyLivenessInfo().timestamp();
                int ttl = row.primaryKeyLivenessInfo().ttl();
                for (Cell<?> cell : row.cells())
                {
                    long cellTimestamp = cell.timestamp();
                    if (cell.isLive(nowInSec))
                    {
                        if (cellTimestamp > timestamp)
                            timestamp = cellTimestamp;
                    }
                }
                return LivenessInfo.create(timestamp, ttl, nowInSec);
            }
        };
    }