public PartitionState inflatePartitionState()

in harry-core/src/harry/reconciler/Reconciler.java [83:265]


    public PartitionState inflatePartitionState(final long pd, DataTracker tracker, Query query)
    {
        PartitionState partitionState = new PartitionState(pd, debugCd, schema);

        class Processor extends VisitExecutor
        {
            // Whether or not a partition deletion was encountered on this LTS.
            private boolean hadPartitionDeletion = false;
            private final List<Ranges.Range> rangeDeletes = new ArrayList<>();
            private final List<ReplayingVisitor.Operation> writes = new ArrayList<>();
            private final List<ReplayingVisitor.Operation> columnDeletes = new ArrayList<>();

            @Override
            protected void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
            {
                if (hadPartitionDeletion)
                    return;

                switch (opType)
                {
                    case DELETE_RANGE:
                        Query query = rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE);
                        Ranges.Range range = query.toRange(lts);
                        rangeDeletes.add(range);
                        partitionState.delete(range, lts);
                        break;
                    case DELETE_SLICE:
                        query = rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE);
                        range = query.toRange(lts);
                        rangeDeletes.add(range);
                        partitionState.delete(range, lts);
                        break;
                    case DELETE_ROW:
                        range = new Ranges.Range(cd, cd, true, true, lts);
                        rangeDeletes.add(range);
                        partitionState.delete(cd, lts);
                        break;
                    case DELETE_PARTITION:
                        partitionState.deletePartition(lts);
                        rangeDeletes.clear();
                        writes.clear();
                        columnDeletes.clear();

                        hadPartitionDeletion = true;
                        break;
                    case INSERT_WITH_STATICS:
                    case INSERT:
                    case UPDATE:
                    case UPDATE_WITH_STATICS:
                        if (debugCd != -1 && cd == debugCd)
                            logger.info("Writing {} ({}) at {}/{}", cd, opType, lts, opId);
                        writes.add(new ReplayingVisitor.Operation(cd, opId, opType));
                        break;
                    case DELETE_COLUMN_WITH_STATICS:
                    case DELETE_COLUMN:
                        columnDeletes.add(new ReplayingVisitor.Operation(cd, opId, opType));
                        break;
                    default:
                        throw new IllegalStateException();
                }
            }

            @Override
            protected void beforeLts(long lts, long pd)
            {
                rangeDeletes.clear();
                writes.clear();
                columnDeletes.clear();
                hadPartitionDeletion = false;
            }

            @Override
            protected void afterLts(long lts, long pd)
            {
                if (hadPartitionDeletion)
                    return;

                outer: for (ReplayingVisitor.Operation op : writes)
                {
                    long opId = op.opId;
                    long cd = op.cd;

                    switch (op.opType)
                    {
                        case INSERT_WITH_STATICS:
                        case UPDATE_WITH_STATICS:
                            // We could apply static columns during the first iteration, but it's more convenient
                            // to reconcile static-level deletions.
                            partitionState.writeStaticRow(descriptorSelector.sds(pd, cd, lts, opId, op.opType, schema),
                                                          lts);
                        case INSERT:
                        case UPDATE:
                            if (!query.match(cd))
                            {
                                if (debugCd != -1 && cd == debugCd)
                                    logger.info("Hiding {} at {}/{} because there was no query match", debugCd, lts, opId);
                                continue outer;
                            }

                            for (Ranges.Range range : rangeDeletes)
                            {
                                if (range.timestamp >= lts && range.contains(cd))
                                {
                                    if (debugCd != -1 && cd == debugCd)
                                        logger.info("Hiding {} at {}/{} because of range tombstone {}", debugCd, lts, opId, range);
                                    continue outer;
                                }
                            }

                            partitionState.write(cd,
                                                 descriptorSelector.vds(pd, cd, lts, opId, op.opType, schema),
                                                 lts,
                                                 op.opType == OpSelectors.OperationKind.INSERT || op.opType == OpSelectors.OperationKind.INSERT_WITH_STATICS);
                            break;
                        default:
                            throw new IllegalStateException(op.opType.toString());
                    }
                }

                outer: for (ReplayingVisitor.Operation op : columnDeletes)
                {
                    long opId = op.opId;
                    long cd = op.cd;

                    switch (op.opType)
                    {
                        case DELETE_COLUMN_WITH_STATICS:
                            partitionState.deleteStaticColumns(lts,
                                                               schema.staticColumnsOffset,
                                                               descriptorSelector.columnMask(pd, lts, opId, op.opType),
                                                               schema.staticColumnsMask());
                        case DELETE_COLUMN:
                            if (!query.match(cd))
                            {
                                if (debugCd != -1 && cd == debugCd)
                                    logger.info("Hiding {} at {}/{} because there was no query match", debugCd, lts, opId);
                                continue outer;
                            }

                            for (Ranges.Range range : rangeDeletes)
                            {
                                if (range.timestamp >= lts && range.contains(cd))
                                {
                                    if (debugCd != -1 && cd == debugCd)
                                        logger.info("Hiding {} at {}/{} because of range tombstone {}", debugCd, lts, opId, range);
                                    continue outer;
                                }
                            }

                            partitionState.deleteRegularColumns(lts,
                                                                cd,
                                                                schema.regularColumnsOffset,
                                                                descriptorSelector.columnMask(pd, lts, opId, op.opType),
                                                                schema.regularColumnsMask());
                            break;
                    }
                }
            }

            @Override
            protected void afterBatch(long lts, long pd, long m) {}

            @Override
            protected void beforeBatch(long lts, long pd, long m) {}

            @Override
            public void shutdown() throws InterruptedException {}
        }

        LtsVisitor visitor = visitorFactory.apply(new Processor());

        long currentLts = pdSelector.minLtsFor(pd);
        long maxStarted = tracker.maxStarted();
        while (currentLts <= maxStarted && currentLts >= 0)
        {
            if (tracker.isFinished(currentLts))
                visitor.visit(currentLts);

            currentLts = pdSelector.nextLts(currentLts);
        }

        return partitionState;
    }