in accord-core/src/main/java/accord/local/cfk/Pruning.java [315:489]
static CommandsForKey pruneBefore(CommandsForKey cfk, TxnInfo newPrunedBefore, int pos)
{
Invariants.requireArgument(newPrunedBefore.compareSimultaneousEpochAndHlc(cfk.prunedBefore()) > 0, "Expect new prunedBefore to have > HLC and >= epoch (%s vs %s)", newPrunedBefore, cfk.prunedBefore());
Invariants.requireArgument(newPrunedBefore.executeAt.compareSimultaneousEpochAndHlc(cfk.prunedBefore().executeAt) > 0, "Expect new prunedBefore.executeAt to have > HLC and >= epoch (%s vs %s)", newPrunedBefore.executeAt, cfk.prunedBefore().executeAt);
Invariants.requireArgument(newPrunedBefore.mayExecute());
TxnInfo[] byId = cfk.byId;
TxnInfo[] committedByExecuteAt = cfk.committedByExecuteAt;
int minUndecidedById;
int retainCount = 0, removedCommittedCount = 0;
// a store of committed executeAts we have removed where we cannot otherwise cheaply infer it
Object[] removedExecuteAts = NO_TXNIDS;
int removedExecuteAtCount = 0;
Long2ObjectHashMap<TxnInfo> epochPrunedBefores = buildEpochPrunedBefores(byId, committedByExecuteAt, newPrunedBefore);
TxnInfo[] newById;
{
minUndecidedById = cfk.minUndecidedById;
int minUndecidedByIdDelta = 0;
RecursiveObjectBuffers<TxnId> missingBuffers = new RecursiveObjectBuffers<>(cachedTxnIds());
TxnId[] mergedMissing = newPrunedBefore.missing();
int mergedMissingCount = mergedMissing.length;
TxnInfo activePruneEpochBefore = newPrunedBefore; // note that if activePrunedBefore != newPrunedBefore, we compare with executeAt
long activePruneEpoch = activePruneEpochBefore.epoch();
Object[] newByIdBuffer = cachedAny().get(pos);
for (int i = pos - 1 ; i >= 0 ; --i)
{
TxnInfo txn = byId[i];
switch (txn.status())
{
default: throw new UnhandledEnum(txn.status());
case COMMITTED:
case STABLE:
if (txn.mayExecute())
{
newByIdBuffer[pos - ++retainCount] = txn;
}
else
{
long epoch = txn.epoch();
if (epoch != activePruneEpoch && epochPrunedBefores != null)
{
activePruneEpochBefore = epochPrunedBefores.get(epoch);
activePruneEpoch = epoch;
}
if (activePruneEpochBefore == txn || (activePruneEpochBefore == newPrunedBefore && activePruneEpochBefore.executeAt.compareTo(txn.executeAt) <= 0))
newByIdBuffer[pos - ++retainCount] = txn;
}
break;
case TRANSITIVE:
case TRANSITIVE_VISIBLE:
case PREACCEPTED_WITHOUT_DEPS:
case PREACCEPTED_WITH_DEPS:
case PREACCEPTED_COORD_NO_FAST_COMMIT:
case NOTACCEPTED:
case ACCEPTED:
newByIdBuffer[pos - ++retainCount] = txn;
if (i == minUndecidedById)
minUndecidedByIdDelta = retainCount;
break;
case APPLIED_NOT_DURABLE:
case APPLIED_DURABLE:
case APPLIED_NOT_EXECUTED:
long epoch = txn.epoch();
if (epoch != activePruneEpoch && epochPrunedBefores != null)
{
activePruneEpochBefore = epochPrunedBefores.get(epoch);
activePruneEpoch = epoch;
}
boolean tryPrune = activePruneEpochBefore != null
&& txn.executeAt.compareTo(activePruneEpochBefore.executeAt) < 0
&& txn.compareTo(activePruneEpochBefore) < 0;
if (tryPrune)
{
TxnId[] missing = txn.missing();
if (missing == NO_TXNIDS || SortedArrays.isSubset(missing, 0, missing.length, mergedMissing, 0, mergedMissingCount))
{
if (txn.mayExecute())
{ // if we don't execute, we don't track in committedByExecuteAt, so don't need to update bookkeeping for removing from there
if (missing != NO_TXNIDS)
{
if (removedExecuteAtCount == removedExecuteAts.length)
removedExecuteAts = cachedAny().resize(removedExecuteAts, removedExecuteAtCount, Math.max(8, removedExecuteAtCount + (removedExecuteAtCount >> 1)));
removedExecuteAts[removedExecuteAtCount++] = txn.executeAt;
}
++removedCommittedCount;
}
continue;
}
if (txn.executeAt == txn)
{
mergedMissing = SortedArrays.linearUnion(missing, missing.length, mergedMissing, mergedMissingCount, missingBuffers);
mergedMissingCount = missingBuffers.sizeOfLast(mergedMissing);
}
}
newByIdBuffer[pos - ++retainCount] = txn;
case INVALIDATED:
case PRUNED:
case ERASED:
break;
}
}
if (pos == retainCount)
return cfk;
int removedByIdCount = pos - retainCount;
if (minUndecidedById >= 0)
{
if (minUndecidedById >= pos)
minUndecidedById -= removedByIdCount;
else
minUndecidedById = retainCount - minUndecidedByIdDelta;
}
newById = new TxnInfo[byId.length - removedByIdCount];
System.arraycopy(newByIdBuffer, pos - retainCount, newById, 0, retainCount);
System.arraycopy(byId, pos, newById, retainCount, byId.length - pos);
missingBuffers.discardBuffers();
}
TxnInfo[] newCommittedByExecuteAt;
{ // copy to new committedByExecuteAt array
Arrays.sort(removedExecuteAts, 0, removedExecuteAtCount);
newCommittedByExecuteAt = new TxnInfo[committedByExecuteAt.length - removedCommittedCount];
int sourcePos = Arrays.binarySearch(committedByExecuteAt, newPrunedBefore, TxnInfo::compareExecuteAt);
int insertPos = sourcePos - removedCommittedCount;
int removedExecuteAtPos = removedExecuteAtCount - 1;
TxnInfo activePruneEpochBefore = newPrunedBefore; // note that if activePrunedBefore != newPrunedBefore, we compare with executeAt
long activePruneEpoch = newPrunedBefore.epoch();
for (int i = sourcePos - 1; i >= 0 ; --i)
{
TxnInfo txn = committedByExecuteAt[i];
if (txn.is(APPLIED))
{
long epoch = txn.epoch();
if (epoch != activePruneEpoch && epochPrunedBefores != null)
{
activePruneEpochBefore = epochPrunedBefores.get(epoch);
activePruneEpoch = epoch;
}
boolean tryPrune = activePruneEpochBefore != null
&& txn.compareTo(activePruneEpochBefore) < 0
&& txn.executeAt.compareTo(activePruneEpochBefore.executeAt) < 0;
if (tryPrune)
{
TxnId[] missing = txn.missing();
if (missing == NO_TXNIDS)
continue;
if (removedExecuteAtPos >= 0 && removedExecuteAts[removedExecuteAtPos] == txn.executeAt)
{
--removedExecuteAtPos;
continue;
}
}
}
newCommittedByExecuteAt[--insertPos] = txn;
}
System.arraycopy(committedByExecuteAt, sourcePos, newCommittedByExecuteAt, sourcePos - removedCommittedCount, committedByExecuteAt.length - sourcePos);
}
cachedAny().forceDiscard(removedExecuteAts, removedExecuteAtCount);
int newMaxAppliedWriteByExecuteAt = cfk.maxAppliedWriteByExecuteAt - removedCommittedCount;
int newMaxAppliedPreBootstrapWriteById = recomputeMaxAppliedPreBootstrapWriteById(cfk.bounds, newById, cfk.maxAppliedPreBootstrapWriteById);
Invariants.require(newById[retainCount] == newPrunedBefore);
return new CommandsForKey(cfk.key, cfk.bounds, newById, minUndecidedById, newMaxAppliedPreBootstrapWriteById, newCommittedByExecuteAt, newMaxAppliedWriteByExecuteAt, cfk.maxUniqueHlc, cfk.loadingPruned, retainCount, cfk.unmanageds);
}