in accord-core/src/main/java/accord/local/cfk/CommandsForKey.java [1337:1490]
public <P1, P2> void visit(Timestamp startedBefore,
Kinds testKind,
ActiveCommandVisitor<P1, P2> visitor,
P1 p1, P2 p2)
{
visitor.visitMaxAppliedHlc(maxUniqueHlc);
TxnId prunedBefore = prunedBefore();
int end = insertPos(startedBefore);
// We only filter durable transactions less than BOTH the txnId and executeAt of our max preceding write.
// This is to avoid the following pre-bootstrap edge case, so this filtering can be made stricter in future:
// A replica is bootstrapping so that includes all transactions before the bootstrap point, but our latest
// transaction occurs before the bootstrap by TxnId, and some of its dependencies occur after.
// Because the replica is pre-bootstrap, it happily participates in an RX including these keys without
// actually witnessing any of the dependencies that have higher TxnId than the RX. The RX applies and
// the range is marked as durable. If now the replica then witnesses one of these transactions it will
// consider that they must be invalidated, as it had not witnessed them, they were not pre-bootstrap and
// they were pre-RX.
TxnInfo maxCommittedWriteBefore = NO_INFO;
TxnInfo maxCommittedWriteForEpoch = NO_INFO;
{
int from = 0, to = committedByExecuteAt.length;
if (maxAppliedWriteByExecuteAt >= 0)
{
if (committedByExecuteAt[maxAppliedWriteByExecuteAt].executeAt.compareTo(startedBefore) <= 0) from = maxAppliedWriteByExecuteAt;
else to = maxAppliedWriteByExecuteAt;
}
int i = SortedArrays.binarySearch(committedByExecuteAt, from, to, startedBefore, (f, v) -> f.compareTo(v.executeAt), FAST);
if (i < 0) i = -2 - i;
else --i;
while (i >= 0 && !committedByExecuteAt[i].is(Write)) --i;
if (i >= 0)
{
maxCommittedWriteBefore = committedByExecuteAt[i];
if (maxCommittedWriteBefore.executeAt == maxCommittedWriteBefore)
maxCommittedWriteForEpoch = maxCommittedWriteBefore;
}
}
// TODO (expected): consider whether this is strictly needed - if we've applied a transaction after the pruned
// point can we guarantee we have witnessed all earlier TxnId? If no RX has yet run, but the new epoch is
// applying transactions then maybe not...? But seems edge casey and may not exist.
// Reason this case out more fully, but in the meantime we'll make sure to return them.
Iterator<LoadingPruned> loadingPruned = null;
LoadingPruned nextPruned = null;
if (shouldVisitMaybePruned() && !BTree.isEmpty(this.loadingPruned))
{
loadingPruned = BTree.iterator(this.loadingPruned);
nextPruned = loadingPruned.next();
}
for (int i = 0; i < end ; ++i)
{
TxnInfo txn = byId[i];
if (!txn.is(testKind)) continue;
if (!txn.isManaged()) continue;
while (nextPruned != null && nextPruned.compareTo(txn) < 0)
{
if (nextPruned.isVisible && nextPruned.is(testKind))
visitor.visit(p1, p2, NOT_DIRECTLY_WITNESSED, key, nextPruned.plainTxnId());
if (loadingPruned.hasNext()) nextPruned = loadingPruned.next();
else nextPruned = null;
}
switch (txn.status())
{
case TRANSITIVE:
case INVALIDATED:
case ERASED:
case PRUNED:
continue;
case COMMITTED:
case STABLE:
case APPLIED_NOT_DURABLE:
case APPLIED_DURABLE:
case APPLIED_NOT_EXECUTED:
if (txn.compareTo(maxCommittedWriteBefore) >= 0
|| txn.executeAt.compareTo(maxCommittedWriteBefore.executeAt) >= 0
|| !Write.witnesses(txn))
break;
switch (dependencyElision())
{
case IF_DURABLE:
if (!txn.isDurable())
break;
case ON:
if (testKind != AnyGloballyVisible)
continue;
// cannot be the max known write for the epoch, so no need to return it
if (!txn.isWrite())
break;
// if we witness everything we assume we're coordinating an ExclusiveSyncPoint
// which must execute in all epochs, so for ease of processing we return a dependency
// from all epochs we know
long epoch = txn.epoch();
if (epoch != maxCommittedWriteForEpoch.epoch())
{
maxCommittedWriteForEpoch = NO_INFO;
for (int j = i ; j < end ; ++j)
{
TxnInfo t = byId[j];
if (t.epoch() != epoch)
break;
if (!t.isWrite() || t.executeAt != t || !t.isCommittedToExecute())
continue;
if (t.compareExecuteAt(maxCommittedWriteForEpoch) > 0)
maxCommittedWriteForEpoch = t;
}
if (maxCommittedWriteForEpoch == NO_INFO)
maxCommittedWriteForEpoch = txn;
}
if (txn.compareTo(maxCommittedWriteForEpoch) < 0
&& txn.executeAt.compareTo(maxCommittedWriteForEpoch.executeAt) < 0)
continue;
}
}
visitor.visit(p1, p2, txn.summaryStatus(), key, txn.plainTxnId());
}
if (startedBefore.compareTo(prunedBefore) <= 0)
{
// In the event we have pruned transactions that may execute before us, we take the earliest future dependency we can in their place.
// This may occur for any transaction that isn't witnessed by a Write (so, sync points and exclusive sync points).
// In the former case, this can only happen on recovery, as the original coordinator would propose a later executeAt than any dependency it may witness.
int i = SortedArrays.binarySearch(committedByExecuteAt, 0, maxAppliedWriteByExecuteAt, startedBefore, (f, v) -> f.compareTo(v.executeAt), FAST);
if (i < 0) i = -1 - i;
while (i < committedByExecuteAt.length)
{
TxnInfo txn = committedByExecuteAt[i];
if (txn.epoch() > startedBefore.epoch())
break;
if (txn.is(Write))
{
visitor.visit(p1, p2, txn.summaryStatus(), key, committedByExecuteAt[i].plainTxnId());
if (txn.compareTo(startedBefore) > 0)
break;
}
++i;
}
}
}