in accord-core/src/main/java/accord/local/cfk/PostProcess.java [250:445]
static NotifyUnmanagedResult notifyUnmanaged(Unmanaged[] unmanageds,
TxnInfo[] byId,
int minUndecidedById,
TxnInfo[] committedByExecuteAt,
int maxAppliedWriteByExecuteAt,
Object[] loadingPruned,
QuickBounds bounds,
boolean isNewBoundsInfo,
@Nullable TxnInfo curInfo,
@Nullable TxnInfo newInfo)
{
// TODO (expected): can we relax this to shardRedundantBefore?
TxnId redundantBefore = bounds.gcBefore;
TxnId bootstrappedAt = bounds.bootstrappedAt;
if (bootstrappedAt.compareTo(redundantBefore) <= 0) bootstrappedAt = null;
PostProcess notifier = null;
{
// notify commit uses exclusive bounds, as we use minUndecided
Timestamp minUndecided = minUndecidedById < 0 ? Timestamp.MAX : byId[minUndecidedById];
if (!BTree.isEmpty(loadingPruned))
{
TxnId minLoadingPruned = bootstrappedAt == null ? BTree.findByIndex(loadingPruned, 0)
: BTree.ceil(loadingPruned, TxnId::compareTo, bootstrappedAt);
minUndecided = Timestamp.nonNullOrMin(minUndecided, minLoadingPruned);
}
int end = findCommit(unmanageds, minUndecided);
if (end > 0)
{
TxnId[] notifyUnmanaged = new TxnId[end];
for (int i = 0 ; i < end ; ++i)
notifyUnmanaged[i] = unmanageds[i].txnId;
unmanageds = Arrays.copyOfRange(unmanageds, end, unmanageds.length);
notifier = new PostProcess.NotifyUnmanagedOfCommit(null, notifyUnmanaged);
}
}
{
if (newInfo != null && newInfo.is(APPLIED))
{
TxnInfo maxContiguousApplied = CommandsForKey.maxContiguousManagedApplied(committedByExecuteAt, maxAppliedWriteByExecuteAt, bootstrappedAt);
if (maxContiguousApplied != null && maxContiguousApplied.compareExecuteAt(newInfo) >= 0)
{
Timestamp applyTo = maxContiguousApplied.executeAt;
int start = findFirstApply(unmanageds);
int end = findApply(unmanageds, start, applyTo);
if (start != end)
{
TxnId[] notifyNotWaiting = selectUnmanaged(unmanageds, start, end);
unmanageds = removeUnmanaged(unmanageds, start, end);
notifier = new PostProcess.NotifyNotWaiting(notifier, notifyNotWaiting);
}
}
}
else if (newInfo == null && isNewBoundsInfo)
{
int firstApply = findFirstApply(unmanageds);
{ // process unmanaged waiting on applies that may now not occur
int start = firstApply;
while (start > 0 && unmanageds[start - 1].waitingUntil.epoch() >= bounds.endEpoch)
--start;
if (start != firstApply)
{
// TODO (desired): we can recompute the waitingUntil here, instead of relying on notify unmanaged to do it for us
// however this should be rare, and probably fine to rely on renotifying
TxnId[] notifyOfCommit = selectUnmanaged(unmanageds, start, firstApply);
unmanageds = removeUnmanaged(unmanageds, start, firstApply);
notifier = new PostProcess.NotifyUnmanagedOfCommit(notifier, notifyOfCommit);
firstApply = start;
}
}
{ // process unmanaged waiting on applies that may now not occur
int start = firstApply;
int end = start;
int j = 1 + maxContiguousManagedAppliedIndex(committedByExecuteAt, maxAppliedWriteByExecuteAt, bootstrappedAt);
while (end < unmanageds.length && j < committedByExecuteAt.length)
{
int c = committedByExecuteAt[j].executeAt.compareTo(unmanageds[end].waitingUntil);
if (c == 0)
{
if (start != end)
{
TxnId[] notifyNotWaiting = selectUnmanaged(unmanageds, start, end);
unmanageds = removeUnmanaged(unmanageds, start, end);
end = start;
notifier = new PostProcess.NotifyNotWaiting(notifier, notifyNotWaiting);
}
start = ++end;
}
else if (c < 0)
{
++j;
}
else
{
++end;
}
}
if (start != unmanageds.length)
{
TxnId[] notifyNotWaiting = selectUnmanaged(unmanageds, start, unmanageds.length);
unmanageds = removeUnmanaged(unmanageds, start, unmanageds.length);
notifier = new PostProcess.NotifyNotWaiting(notifier, notifyNotWaiting);
}
}
}
}
Predicate<Timestamp> rescheduleOrNotifyIf = null;
if ((newInfo != null && newInfo.isAtLeast(INVALIDATED) && curInfo != null && curInfo.isCommittedToExecute()))
{
rescheduleOrNotifyIf = curInfo.executeAt::equals;
}
if (isNewBoundsInfo && bootstrappedAt != null)
{
Timestamp maxPreBootstrap;
{
Timestamp tmp = bootstrappedAt;
for (TxnInfo txn : byId)
{
if (txn.compareTo(bootstrappedAt) > 0)
break;
// while we can in principle exclude all transactions with a lower txnId regardless of their executeAt
// for consistent handling with other transactions we don't leap ahead by executeAt as this permits
// us to also exclude transactions with a higher txnId which is not consistent with other validity checks
// which don't have this additional context
if (txn.executeAt.compareTo(bootstrappedAt) > 0)
continue;
tmp = Timestamp.nonNullOrMax(tmp, txn.executeAt);
}
maxPreBootstrap = tmp;
}
if (rescheduleOrNotifyIf == null)
rescheduleOrNotifyIf = test -> test.compareTo(maxPreBootstrap) <= 0;
else
rescheduleOrNotifyIf = test -> curInfo.executeAt.equals(test) || test.compareTo(maxPreBootstrap) <= 0;
}
if (rescheduleOrNotifyIf != null)
{
// this is a rare edge case, but we might have unmanaged transactions waiting on this command we must re-schedule or notify
boolean clone = true;
int start = findFirstApply(unmanageds);
while (start < unmanageds.length)
{
if (rescheduleOrNotifyIf.test(unmanageds[start].waitingUntil))
{
int end = start + 1;
while (end < unmanageds.length && rescheduleOrNotifyIf.test(unmanageds[end].waitingUntil))
++end;
// find committed predecessor, if any
int predecessor = SortedArrays.binarySearch(committedByExecuteAt, 0, committedByExecuteAt.length, unmanageds[start].waitingUntil, (t, i) -> t.compareTo(i.executeAt), FAST);
if (predecessor < 0) predecessor = -2 - predecessor;
else predecessor = predecessor - 1;
while (predecessor >= 0 && rescheduleOrNotifyIf.test(committedByExecuteAt[predecessor].executeAt))
--predecessor;
if (predecessor >= 0)
{
int maxContiguousApplied = maxContiguousManagedAppliedIndex(committedByExecuteAt, maxAppliedWriteByExecuteAt, bootstrappedAt);
if (maxContiguousApplied >= predecessor)
predecessor = -1;
}
if (predecessor >= 0)
{
Timestamp waitingUntil = committedByExecuteAt[predecessor].plainExecuteAt();
if (clone) unmanageds = unmanageds.clone();
clone = false;
for (int i = start ; i < end ; ++i)
unmanageds[i] = new Unmanaged(APPLY, unmanageds[i].txnId, waitingUntil);
}
else
{
TxnId[] notifyNotWaiting = selectUnmanaged(unmanageds, start, end);
unmanageds = removeUnmanaged(unmanageds, start, end);
notifier = new PostProcess.NotifyNotWaiting(notifier, notifyNotWaiting);
clone = false;
}
start = end - 1;
}
++start;
}
}
if (notifier == null)
return null;
return new NotifyUnmanagedResult(unmanageds, notifier);
}