in harry-core/src/harry/reconciler/Reconciler.java [83:265]
public PartitionState inflatePartitionState(final long pd, DataTracker tracker, Query query)
{
PartitionState partitionState = new PartitionState(pd, debugCd, schema);
class Processor extends VisitExecutor
{
// Whether or not a partition deletion was encountered on this LTS.
private boolean hadPartitionDeletion = false;
private final List<Ranges.Range> rangeDeletes = new ArrayList<>();
private final List<ReplayingVisitor.Operation> writes = new ArrayList<>();
private final List<ReplayingVisitor.Operation> columnDeletes = new ArrayList<>();
@Override
protected void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
{
if (hadPartitionDeletion)
return;
switch (opType)
{
case DELETE_RANGE:
Query query = rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE);
Ranges.Range range = query.toRange(lts);
rangeDeletes.add(range);
partitionState.delete(range, lts);
break;
case DELETE_SLICE:
query = rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE);
range = query.toRange(lts);
rangeDeletes.add(range);
partitionState.delete(range, lts);
break;
case DELETE_ROW:
range = new Ranges.Range(cd, cd, true, true, lts);
rangeDeletes.add(range);
partitionState.delete(cd, lts);
break;
case DELETE_PARTITION:
partitionState.deletePartition(lts);
rangeDeletes.clear();
writes.clear();
columnDeletes.clear();
hadPartitionDeletion = true;
break;
case INSERT_WITH_STATICS:
case INSERT:
case UPDATE:
case UPDATE_WITH_STATICS:
if (debugCd != -1 && cd == debugCd)
logger.info("Writing {} ({}) at {}/{}", cd, opType, lts, opId);
writes.add(new ReplayingVisitor.Operation(cd, opId, opType));
break;
case DELETE_COLUMN_WITH_STATICS:
case DELETE_COLUMN:
columnDeletes.add(new ReplayingVisitor.Operation(cd, opId, opType));
break;
default:
throw new IllegalStateException();
}
}
@Override
protected void beforeLts(long lts, long pd)
{
rangeDeletes.clear();
writes.clear();
columnDeletes.clear();
hadPartitionDeletion = false;
}
@Override
protected void afterLts(long lts, long pd)
{
if (hadPartitionDeletion)
return;
outer: for (ReplayingVisitor.Operation op : writes)
{
long opId = op.opId;
long cd = op.cd;
switch (op.opType)
{
case INSERT_WITH_STATICS:
case UPDATE_WITH_STATICS:
// We could apply static columns during the first iteration, but it's more convenient
// to reconcile static-level deletions.
partitionState.writeStaticRow(descriptorSelector.sds(pd, cd, lts, opId, op.opType, schema),
lts);
case INSERT:
case UPDATE:
if (!query.match(cd))
{
if (debugCd != -1 && cd == debugCd)
logger.info("Hiding {} at {}/{} because there was no query match", debugCd, lts, opId);
continue outer;
}
for (Ranges.Range range : rangeDeletes)
{
if (range.timestamp >= lts && range.contains(cd))
{
if (debugCd != -1 && cd == debugCd)
logger.info("Hiding {} at {}/{} because of range tombstone {}", debugCd, lts, opId, range);
continue outer;
}
}
partitionState.write(cd,
descriptorSelector.vds(pd, cd, lts, opId, op.opType, schema),
lts,
op.opType == OpSelectors.OperationKind.INSERT || op.opType == OpSelectors.OperationKind.INSERT_WITH_STATICS);
break;
default:
throw new IllegalStateException(op.opType.toString());
}
}
outer: for (ReplayingVisitor.Operation op : columnDeletes)
{
long opId = op.opId;
long cd = op.cd;
switch (op.opType)
{
case DELETE_COLUMN_WITH_STATICS:
partitionState.deleteStaticColumns(lts,
schema.staticColumnsOffset,
descriptorSelector.columnMask(pd, lts, opId, op.opType),
schema.staticColumnsMask());
case DELETE_COLUMN:
if (!query.match(cd))
{
if (debugCd != -1 && cd == debugCd)
logger.info("Hiding {} at {}/{} because there was no query match", debugCd, lts, opId);
continue outer;
}
for (Ranges.Range range : rangeDeletes)
{
if (range.timestamp >= lts && range.contains(cd))
{
if (debugCd != -1 && cd == debugCd)
logger.info("Hiding {} at {}/{} because of range tombstone {}", debugCd, lts, opId, range);
continue outer;
}
}
partitionState.deleteRegularColumns(lts,
cd,
schema.regularColumnsOffset,
descriptorSelector.columnMask(pd, lts, opId, op.opType),
schema.regularColumnsMask());
break;
}
}
}
@Override
protected void afterBatch(long lts, long pd, long m) {}
@Override
protected void beforeBatch(long lts, long pd, long m) {}
@Override
public void shutdown() throws InterruptedException {}
}
LtsVisitor visitor = visitorFactory.apply(new Processor());
long currentLts = pdSelector.minLtsFor(pd);
long maxStarted = tracker.maxStarted();
while (currentLts <= maxStarted && currentLts >= 0)
{
if (tracker.isFinished(currentLts))
visitor.visit(currentLts);
currentLts = pdSelector.nextLts(currentLts);
}
return partitionState;
}