static NotifyUnmanagedResult notifyUnmanaged()

in accord-core/src/main/java/accord/local/cfk/PostProcess.java [250:445]


    static NotifyUnmanagedResult notifyUnmanaged(Unmanaged[] unmanageds,
                                                 TxnInfo[] byId,
                                                 int minUndecidedById,
                                                 TxnInfo[] committedByExecuteAt,
                                                 int maxAppliedWriteByExecuteAt,
                                                 Object[] loadingPruned,
                                                 QuickBounds bounds,
                                                 boolean isNewBoundsInfo,
                                                 @Nullable TxnInfo curInfo,
                                                 @Nullable TxnInfo newInfo)
    {
        // TODO (expected): can we relax this to shardRedundantBefore?
        TxnId redundantBefore = bounds.gcBefore;
        TxnId bootstrappedAt = bounds.bootstrappedAt;
        if (bootstrappedAt.compareTo(redundantBefore) <= 0) bootstrappedAt = null;

        PostProcess notifier = null;
        {
            // notify commit uses exclusive bounds, as we use minUndecided
            Timestamp minUndecided = minUndecidedById < 0 ? Timestamp.MAX : byId[minUndecidedById];
            if (!BTree.isEmpty(loadingPruned))
            {
                TxnId minLoadingPruned = bootstrappedAt == null ? BTree.findByIndex(loadingPruned, 0)
                                                                : BTree.ceil(loadingPruned, TxnId::compareTo, bootstrappedAt);

                minUndecided = Timestamp.nonNullOrMin(minUndecided, minLoadingPruned);
            }
            int end = findCommit(unmanageds, minUndecided);
            if (end > 0)
            {
                TxnId[] notifyUnmanaged = new TxnId[end];
                for (int i = 0 ; i < end ; ++i)
                    notifyUnmanaged[i] = unmanageds[i].txnId;

                unmanageds = Arrays.copyOfRange(unmanageds, end, unmanageds.length);
                notifier = new PostProcess.NotifyUnmanagedOfCommit(null, notifyUnmanaged);
            }
        }

        {
            if (newInfo != null && newInfo.is(APPLIED))
            {
                TxnInfo maxContiguousApplied = CommandsForKey.maxContiguousManagedApplied(committedByExecuteAt, maxAppliedWriteByExecuteAt, bootstrappedAt);
                if (maxContiguousApplied != null && maxContiguousApplied.compareExecuteAt(newInfo) >= 0)
                {
                    Timestamp applyTo = maxContiguousApplied.executeAt;
                    int start = findFirstApply(unmanageds);
                    int end = findApply(unmanageds, start, applyTo);
                    if (start != end)
                    {
                        TxnId[] notifyNotWaiting = selectUnmanaged(unmanageds, start, end);
                        unmanageds = removeUnmanaged(unmanageds, start, end);
                        notifier = new PostProcess.NotifyNotWaiting(notifier, notifyNotWaiting);
                    }
                }
            }
            else if (newInfo == null && isNewBoundsInfo)
            {
                int firstApply = findFirstApply(unmanageds);
                {   // process unmanaged waiting on applies that may now not occur
                    int start = firstApply;
                    while (start > 0 && unmanageds[start - 1].waitingUntil.epoch() >= bounds.endEpoch)
                        --start;

                    if (start != firstApply)
                    {
                        // TODO (desired): we can recompute the waitingUntil here, instead of relying on notify unmanaged to do it for us
                        //  however this should be rare, and probably fine to rely on renotifying
                        TxnId[] notifyOfCommit = selectUnmanaged(unmanageds, start, firstApply);
                        unmanageds = removeUnmanaged(unmanageds, start, firstApply);
                        notifier = new PostProcess.NotifyUnmanagedOfCommit(notifier, notifyOfCommit);
                        firstApply = start;
                    }
                }
                {   // process unmanaged waiting on applies that may now not occur
                    int start = firstApply;
                    int end = start;
                    int j = 1 + maxContiguousManagedAppliedIndex(committedByExecuteAt, maxAppliedWriteByExecuteAt, bootstrappedAt);
                    while (end < unmanageds.length && j < committedByExecuteAt.length)
                    {
                        int c = committedByExecuteAt[j].executeAt.compareTo(unmanageds[end].waitingUntil);
                        if (c == 0)
                        {
                            if (start != end)
                            {
                                TxnId[] notifyNotWaiting = selectUnmanaged(unmanageds, start, end);
                                unmanageds = removeUnmanaged(unmanageds, start, end);
                                end = start;
                                notifier = new PostProcess.NotifyNotWaiting(notifier, notifyNotWaiting);
                            }
                            start = ++end;
                        }
                        else if (c < 0)
                        {
                            ++j;
                        }
                        else
                        {
                            ++end;
                        }
                    }
                    if (start != unmanageds.length)
                    {
                        TxnId[] notifyNotWaiting = selectUnmanaged(unmanageds, start, unmanageds.length);
                        unmanageds = removeUnmanaged(unmanageds, start, unmanageds.length);
                        notifier = new PostProcess.NotifyNotWaiting(notifier, notifyNotWaiting);
                    }
                }
            }
        }

        Predicate<Timestamp> rescheduleOrNotifyIf  = null;
        if ((newInfo != null && newInfo.isAtLeast(INVALIDATED) && curInfo != null && curInfo.isCommittedToExecute()))
        {
            rescheduleOrNotifyIf = curInfo.executeAt::equals;
        }

        if (isNewBoundsInfo && bootstrappedAt != null)
        {
            Timestamp maxPreBootstrap;
            {
                Timestamp tmp = bootstrappedAt;
                for (TxnInfo txn : byId)
                {
                    if (txn.compareTo(bootstrappedAt) > 0)
                        break;
                    // while we can in principle exclude all transactions with a lower txnId regardless of their executeAt
                    // for consistent handling with other transactions we don't leap ahead by executeAt as this permits
                    // us to also exclude transactions with a higher txnId which is not consistent with other validity checks
                    // which don't have this additional context
                    if (txn.executeAt.compareTo(bootstrappedAt) > 0)
                        continue;
                    tmp = Timestamp.nonNullOrMax(tmp, txn.executeAt);
                }
                maxPreBootstrap = tmp;
            }
            if (rescheduleOrNotifyIf == null)
                rescheduleOrNotifyIf = test -> test.compareTo(maxPreBootstrap) <= 0;
            else
                rescheduleOrNotifyIf = test -> curInfo.executeAt.equals(test) || test.compareTo(maxPreBootstrap) <= 0;
        }

        if (rescheduleOrNotifyIf != null)
        {
            // this is a rare edge case, but we might have unmanaged transactions waiting on this command we must re-schedule or notify
            boolean clone = true;
            int start = findFirstApply(unmanageds);
            while (start < unmanageds.length)
            {
                if (rescheduleOrNotifyIf.test(unmanageds[start].waitingUntil))
                {
                    int end = start + 1;
                    while (end < unmanageds.length && rescheduleOrNotifyIf.test(unmanageds[end].waitingUntil))
                        ++end;

                    // find committed predecessor, if any
                    int predecessor = SortedArrays.binarySearch(committedByExecuteAt, 0, committedByExecuteAt.length, unmanageds[start].waitingUntil, (t, i) -> t.compareTo(i.executeAt), FAST);
                    if (predecessor < 0) predecessor = -2 - predecessor;
                    else predecessor = predecessor - 1;

                    while (predecessor >= 0 && rescheduleOrNotifyIf.test(committedByExecuteAt[predecessor].executeAt))
                        --predecessor;

                    if (predecessor >= 0)
                    {
                        int maxContiguousApplied = maxContiguousManagedAppliedIndex(committedByExecuteAt, maxAppliedWriteByExecuteAt, bootstrappedAt);
                        if (maxContiguousApplied >= predecessor)
                            predecessor = -1;
                    }

                    if (predecessor >= 0)
                    {
                        Timestamp waitingUntil = committedByExecuteAt[predecessor].plainExecuteAt();
                        if (clone) unmanageds = unmanageds.clone();
                        clone = false;
                        for (int i = start ; i < end ; ++i)
                            unmanageds[i] = new Unmanaged(APPLY, unmanageds[i].txnId, waitingUntil);
                    }
                    else
                    {
                        TxnId[] notifyNotWaiting = selectUnmanaged(unmanageds, start, end);
                        unmanageds = removeUnmanaged(unmanageds, start, end);
                        notifier = new PostProcess.NotifyNotWaiting(notifier, notifyNotWaiting);
                        clone = false;
                    }
                    start = end - 1;
                }
                ++start;
            }
        }

        if (notifier == null)
            return null;

        return new NotifyUnmanagedResult(unmanageds, notifier);
    }