private void notifyManaged()

in accord-core/src/main/java/accord/local/cfk/CommandsForKey.java [1855:1947]


    private void notifyManaged(SafeCommandStore safeStore, Kinds kinds, int mayNotExecuteBeforeIndex, int mayExecuteToIndex, int mayExecuteAny, NotifySink notifySink)
    {
        int undecidedIndex = minUndecidedById < 0 ? byId.length : minUndecidedById;
        long unappliedCounters = 0L;
        TxnId minUndecided = minUndecided();
        if (minUndecided == null)
            minUndecided = bootstrappedAt(bounds); // we don't count txns before this as waiting to execute

        for (int i = maxAppliedWriteByExecuteAt + 1; i < mayExecuteToIndex ; ++i)
        {
            TxnInfo txn = committedByExecuteAt[i];
            if (txn.is(APPLIED))
                continue;

            if (txn.mayExecute() && !txn.hasNotifiedReady())
            {
                if (i >= mayNotExecuteBeforeIndex && (kinds.test(txn) || i == mayExecuteAny) && !isWaitingOnPruned(loadingPruned, txn, txn.executeAt, bounds))
                {
                    switch (txn.status())
                    {
                        case COMMITTED:
                        {
                            if (txn.hasNotifiedWaiting())
                                break;

                            // cannot execute as dependencies not stable, so notify progress log to get or decide stable deps
                            notifySink.waitingOn(safeStore, txn, key, SaveStatus.Stable, HasStableDeps, true);
                            txn.setNotifiedWaitingInPlace();
                            break;
                        }

                        case STABLE:
                        {
                            if (txn.hasNotifiedReady())
                                break;

                            if (undecidedIndex < byId.length)
                            {
                                int nextUndecidedIndex = SortedArrays.exponentialSearch(byId, undecidedIndex, byId.length, txn.executeAt, Timestamp::compareTo, FAST);
                                if (nextUndecidedIndex < 0) nextUndecidedIndex = -1 -nextUndecidedIndex;
                                while (undecidedIndex < nextUndecidedIndex)
                                {
                                    TxnInfo backfillTxn = byId[undecidedIndex++];
                                    if (backfillTxn.compareTo(InternalStatus.COMMITTED) >= 0 || !mayExecute(backfillTxn)) continue;
                                    unappliedCounters += unappliedCountersDelta(backfillTxn.kindOrdinal());
                                }
                            }

                            int expectMissingCount = unappliedCount(unappliedCounters, txn.kindOrdinal());

                            // We remove committed transactions from the missing set, since they no longer need them there
                            // So the missing collection represents only those uncommitted transaction ids that a transaction
                            // witnesses/conflicts with. So we may simply count all of those we know of with a lower TxnId,
                            // and if the count is the same then we are not awaiting any of them for execution and can remove
                            // this command's dependency on this key for execution.
                            TxnId[] missing = txn.missing();
                            int missingCount = missing.length;
                            if (missingCount > 0)
                            {
                                int missingFrom = 0;
                                if (minUndecided != null)
                                {
                                    missingFrom = SortedArrays.binarySearch(missing, 0, missing.length, minUndecided, TxnId::compareTo, FAST);
                                    if (missingFrom < 0) missingFrom = -1 - missingFrom;
                                    missingCount -= missingFrom;
                                }
                                for (int j = missingFrom ; j < missing.length ; ++j)
                                {
                                    if (!managesExecution(missing[j]) || missing[j].is(UNSTABLE))
                                        --missingCount;
                                }
                            }
                            if (expectMissingCount == missingCount)
                            {
                                TxnId txnId = txn.plainTxnId();
                                long minUniqueHlc = 0;
                                if (txnId.isWrite() && (maxAppliedPreBootstrapWriteById < 0 || byId[maxAppliedPreBootstrapWriteById].executeAt.compareTo(txn.executeAt) < 0))
                                    minUniqueHlc = maxUniqueHlc + 1;
                                notifySink.notWaiting(safeStore, txnId, key, minUniqueHlc);
                                // TODO (expected): avoid invoking this here; we may do redundant work if we have local dependencies we're already waiting on
                                notifySink.waitingOn(safeStore, txn, key, SaveStatus.PreApplied, CanApply, false);
                                txn.setNotifiedReadyInPlace();
                            }
                        }
                    }
                }
            }

            unappliedCounters += unappliedCountersDelta(txn.kindOrdinal());
            if (txn.is(Kind.Write))
                return; // the minimum execute index occurs after the next write, so nothing to do yet
        }
    }