in accord-core/src/main/java/accord/local/cfk/CommandsForKey.java [1855:1947]
private void notifyManaged(SafeCommandStore safeStore, Kinds kinds, int mayNotExecuteBeforeIndex, int mayExecuteToIndex, int mayExecuteAny, NotifySink notifySink)
{
int undecidedIndex = minUndecidedById < 0 ? byId.length : minUndecidedById;
long unappliedCounters = 0L;
TxnId minUndecided = minUndecided();
if (minUndecided == null)
minUndecided = bootstrappedAt(bounds); // we don't count txns before this as waiting to execute
for (int i = maxAppliedWriteByExecuteAt + 1; i < mayExecuteToIndex ; ++i)
{
TxnInfo txn = committedByExecuteAt[i];
if (txn.is(APPLIED))
continue;
if (txn.mayExecute() && !txn.hasNotifiedReady())
{
if (i >= mayNotExecuteBeforeIndex && (kinds.test(txn) || i == mayExecuteAny) && !isWaitingOnPruned(loadingPruned, txn, txn.executeAt, bounds))
{
switch (txn.status())
{
case COMMITTED:
{
if (txn.hasNotifiedWaiting())
break;
// cannot execute as dependencies not stable, so notify progress log to get or decide stable deps
notifySink.waitingOn(safeStore, txn, key, SaveStatus.Stable, HasStableDeps, true);
txn.setNotifiedWaitingInPlace();
break;
}
case STABLE:
{
if (txn.hasNotifiedReady())
break;
if (undecidedIndex < byId.length)
{
int nextUndecidedIndex = SortedArrays.exponentialSearch(byId, undecidedIndex, byId.length, txn.executeAt, Timestamp::compareTo, FAST);
if (nextUndecidedIndex < 0) nextUndecidedIndex = -1 -nextUndecidedIndex;
while (undecidedIndex < nextUndecidedIndex)
{
TxnInfo backfillTxn = byId[undecidedIndex++];
if (backfillTxn.compareTo(InternalStatus.COMMITTED) >= 0 || !mayExecute(backfillTxn)) continue;
unappliedCounters += unappliedCountersDelta(backfillTxn.kindOrdinal());
}
}
int expectMissingCount = unappliedCount(unappliedCounters, txn.kindOrdinal());
// We remove committed transactions from the missing set, since they no longer need them there
// So the missing collection represents only those uncommitted transaction ids that a transaction
// witnesses/conflicts with. So we may simply count all of those we know of with a lower TxnId,
// and if the count is the same then we are not awaiting any of them for execution and can remove
// this command's dependency on this key for execution.
TxnId[] missing = txn.missing();
int missingCount = missing.length;
if (missingCount > 0)
{
int missingFrom = 0;
if (minUndecided != null)
{
missingFrom = SortedArrays.binarySearch(missing, 0, missing.length, minUndecided, TxnId::compareTo, FAST);
if (missingFrom < 0) missingFrom = -1 - missingFrom;
missingCount -= missingFrom;
}
for (int j = missingFrom ; j < missing.length ; ++j)
{
if (!managesExecution(missing[j]) || missing[j].is(UNSTABLE))
--missingCount;
}
}
if (expectMissingCount == missingCount)
{
TxnId txnId = txn.plainTxnId();
long minUniqueHlc = 0;
if (txnId.isWrite() && (maxAppliedPreBootstrapWriteById < 0 || byId[maxAppliedPreBootstrapWriteById].executeAt.compareTo(txn.executeAt) < 0))
minUniqueHlc = maxUniqueHlc + 1;
notifySink.notWaiting(safeStore, txnId, key, minUniqueHlc);
// TODO (expected): avoid invoking this here; we may do redundant work if we have local dependencies we're already waiting on
notifySink.waitingOn(safeStore, txn, key, SaveStatus.PreApplied, CanApply, false);
txn.setNotifiedReadyInPlace();
}
}
}
}
}
unappliedCounters += unappliedCountersDelta(txn.kindOrdinal());
if (txn.is(Kind.Write))
return; // the minimum execute index occurs after the next write, so nothing to do yet
}
}