static CommandsForKeyUpdate updateUnmanaged()

in accord-core/src/main/java/accord/local/cfk/Updating.java [878:1133]


    static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, SafeCommand safeCommand, UpdateUnmanagedMode mode, @Nullable List<CommandsForKey.Unmanaged> update)
    {
        boolean register = mode != UPDATE;
        Invariants.requireArgument(mode == UPDATE || update == null);
        if (safeCommand.current().hasBeen(Status.Truncated))
            return cfk;

        Command orig = safeCommand.current();
        if (orig.saveStatus() == Uninitialised && orig.txnId().is(EphemeralRead))
            return cfk;

        Command.Committed command = safeCommand.current().asCommitted();
        TxnId waitingTxnId = command.txnId();
        // used only to decide if an executeAt is included _on the assumption the TxnId is_. For ?[EX] this is all timestamps
        Timestamp compareExecuteAt = waitingTxnId.awaitsOnlyDeps() ? Timestamp.MAX : command.executeAt();

        TxnInfo[] byId = cfk.byId, newById = null;
        RelationMultiMap.SortedRelationList<TxnId> txnIds = command.partialDeps().keyDeps.txnIdsWithFlags(cfk.key());
        TxnId[] missing = NO_TXNIDS;
        int missingCount = 0;
        // we only populate dependencies to facilitate execution, not for any distributed decision,
        // so we can filter to only transactions we need to execute locally
        int i = txnIds.find(cfk.redundantOrBootstrappedBefore());
        if (i < 0) i = -1 - i;
        int waitingFromIndex = i; // the min input index we expect to execute
        if (waitingTxnId.is(ExclusiveSyncPoint) && waitingTxnId.is(Range) && mode == REGISTER)
        {
            // for RX we register all our transitive dependencies to make sure we can answer coordinated dependency calculations
            // in this case we separate out the position from which we insert missing txnId and where we compute readiness to execute
            i = txnIds.find(cfk.redundantBefore());
            if (i < 0) i = -1 - i;
        }

        // note that while we may directly insert transactions that are for a future epoch we don't own,
        // we don't do this for dependencies as we only want to know these transactions for answering
        // recovery decisions about transactions we do own
        // We make sure to compute dependencies that satisfy all participating epochs for RX/KX
        int toIndex = txnIds.size();
        while (toIndex > 0 && txnIds.get(toIndex - 1).epoch() >= cfk.bounds.endEpoch)
            --toIndex;

        if (i < toIndex)
        {
            // we may have dependencies that execute after us, but in this case we only wait for them to (transitively) commit
            // so we distinguish the input index we need to execute transactions to from the index we need to commit to
            int waitingToExecuteIndex = toIndex;
            if (waitingToExecuteIndex > waitingFromIndex)
            {
                TxnId maxWaiting = txnIds.get(waitingToExecuteIndex - 1);
                Timestamp includeBefore = waitingTxnId.is(EphemeralRead) ? Timestamp.MAX : command.executeAt();
                if (maxWaiting.compareTo(includeBefore) >= 0)
                {
                    waitingToExecuteIndex = txnIds.find(includeBefore);
                    if (waitingToExecuteIndex < 0)
                        waitingToExecuteIndex = -1 - waitingToExecuteIndex;
                }
            }

            boolean readyToApply = true; // our dependencies have applied, so we are ready to apply
            boolean waitingToApply = true; // our dependencies have committed, so we know when we execute and are waiting
            boolean hasFutureDependency = toIndex > waitingFromIndex
                                          && !waitingTxnId.is(EphemeralRead) // ephemeral reads wait for everything they witness, regardless of TxnId or executeAt, so the latest dependency is always enough
                                          && txnIds.get(toIndex - 1).compareTo(command.executeAt()) > 0;

            if (hasFutureDependency)
            {
                // This logic is to handle the case where we have pruned dependencies on the replicas we have used to calculate our dependencies
                // so we may be missing an execution dependency, and we require that we are transitively committed
                int waitingOnCommit = Arrays.binarySearch(byId, txnIds.get(toIndex - 1));
                if (waitingOnCommit < 0) waitingOnCommit = -1 - waitingOnCommit;
                if (cfk.minUndecidedById >= 0 && cfk.minUndecidedById < waitingOnCommit)
                    readyToApply = waitingToApply = false;
            }

            Timestamp waitingToExecuteAt = null; // when the CFK should notify not waiting
            Timestamp effectiveExecutesAt = null; // the executeAt we should report to WaitingOn for ephemeral reads

            int j = SortedArrays.binarySearch(byId, 0, byId.length, txnIds.get(i), Timestamp::compareTo, FAST);
            if (j < 0) j = -1 -j;

            while (i < toIndex)
            {
                int c = j == byId.length ? -1 : txnIds.get(i).compareTo(byId[j]);
                if (c >= 0)
                {
                    TxnInfo txn = byId[j];
                    if (c == 0 || txn.witnessedBy(waitingTxnId))
                    {
                        if (i >= waitingFromIndex && waitingToApply)
                        {
                            if (txn.mayExecute())
                            {
                                if (txn.compareTo(COMMITTED) < 0) waitingToApply = readyToApply = false;
                                else if (i < waitingToExecuteIndex)
                                {
                                    if (txn.compareTo(APPLIED) < 0 && txn.executeAt.compareTo(compareExecuteAt) < 0)
                                    {
                                        readyToApply = false;
                                        waitingToExecuteAt = Timestamp.nonNullOrMax(waitingToExecuteAt, txn.executeAt);
                                    }
                                    else if (txn.is(APPLIED))
                                    {
                                        effectiveExecutesAt = Timestamp.nonNullOrMax(effectiveExecutesAt, txn.executeAt);
                                    }
                                }
                            }
                            else if (waitingTxnId.awaitsOnlyDeps())
                            {
                                // only really need to track epoch, but track max executeAt to support retryInLatestEpoch
                                effectiveExecutesAt = Timestamp.nonNullOrMax(effectiveExecutesAt, txn.executeAt);
                            }
                        }
                        if (c == 0)
                        {
                            if (txn.is(TRANSITIVE) && isTransitiveDependencyVisible(waitingTxnId))
                            {
                                if (newById == null)
                                    newById = byId.clone();
                                newById[j] = TxnInfo.create(txn, TRANSITIVE_VISIBLE, txn.mayExecute(), txn.statusOverrides(), txn.executeAt, txn.missing(), txn.ballot());
                            }
                            ++i;
                        }
                    }
                    ++j;
                }
                else if (register)
                {
                    TxnId insert = txnIds.get(i);
                    if (i >= waitingFromIndex)
                        readyToApply = waitingToApply = false;
                    if (missingCount == missing.length)
                        missing = cachedTxnIds().resize(missing, missingCount, Math.max(8, missingCount + missingCount/2));
                    missing[missingCount++] = insert;
                    ++i;
                }
                else
                {
                    Invariants.require(txnIds.get(i++).compareTo(cfk.prunedBefore()) < 0);
                }
            }

            if (toIndex < txnIds.size() && waitingTxnId.awaitsOnlyDeps())
                effectiveExecutesAt = Timestamp.nonNullOrMax(effectiveExecutesAt, txnIds.get(txnIds.size() - 1));

            // TODO (required): document why we can restrict this test to sync points
            if (waitingToApply && waitingTxnId.isSyncPoint() && Pruning.isAnyPredecessorWaitingOnPruned(cfk.loadingPruned, waitingTxnId))
                readyToApply = waitingToApply = false;

            if (waitingToApply && hasFutureDependency)
            {
                // This logic is to handle the case where we have pruned dependencies on the replicas we have used to calculate our dependencies
                // so we may be missing an execution dependency, and we require that we are transitively committed
                int w = Arrays.binarySearch(byId, command.executeAt());
                if (w < 0) w = -1 - w;
                // TODO (desired): consider moving this logic inline to the main loop body
                while (--w >= 0)
                {
                    TxnInfo txn = byId[w];
                    if (!txn.mayExecute()
                        || !txn.witnessedBy(waitingTxnId)
                        || txn.isAtLeast(APPLIED))
                        continue;

                    Invariants.require(txn.compareTo(COMMITTED) >= 0);
                    if (txn.executeAt.compareTo(compareExecuteAt) >= 0)
                        continue;

                    readyToApply = false;
                    waitingToExecuteAt = Timestamp.nonNullOrMax(waitingToExecuteAt, txn.executeAt);
                }
            }

            waitingToExecuteAt = updateExecuteAtLeast(waitingToExecuteAt, effectiveExecutesAt, safeCommand);
            if (!readyToApply || missingCount > 0 || newById != null)
            {
                if (newById == null) newById = byId;
                TxnInfo[] newCommittedByExecuteAt = cfk.committedByExecuteAt;
                int newMinUndecidedById = cfk.minUndecidedById;
                int newMaxAppliedPreBootstrapWriteById = cfk.maxAppliedPreBootstrapWriteById;
                Object[] newLoadingPruned = cfk.loadingPruned;
                TxnId[] prunedIds = NO_TXNIDS;
                int clearMissingCount = missingCount;
                if (missingCount > 0)
                {
                    int prunedIndex = Arrays.binarySearch(missing, 0, missingCount, cfk.prunedBefore());
                    if (prunedIndex < 0) prunedIndex = -1 - prunedIndex;
                    if (prunedIndex > 0)
                    {
                        prunedIds = Arrays.copyOf(missing, prunedIndex);
                        List<TxnId> insertLoadPruned = new ArrayList<>();
                        newLoadingPruned = Pruning.loadPruned(cfk.loadingPruned, prunedIds, waitingTxnId, insertLoadPruned);
                        prunedIds = insertLoadPruned.isEmpty() ? NO_TXNIDS : insertLoadPruned.toArray(TxnId[]::new);
                    }

                    if (prunedIndex != missingCount)
                    {
                        missingCount -= prunedIndex;
                        System.arraycopy(missing, prunedIndex, missing, 0, missingCount);
                        removeNonIdentityFlags(missing, missingCount);
                        int minUndecidedMissingIndex = 0;
                        while (minUndecidedMissingIndex < missingCount && !cfk.mayExecute(missing[minUndecidedMissingIndex]))
                            ++minUndecidedMissingIndex;
                        TxnId minUndecidedMissing = minUndecidedMissingIndex == missingCount ? null : missing[minUndecidedMissingIndex];
                        TxnId minUndecided = TxnId.nonNullOrMin(minUndecidedMissing, cfk.minUndecided());
                        TxnInfo[] copyById = newById;
                        newById = new TxnInfo[byId.length + missingCount];
                        newCommittedByExecuteAt = insertAdditionsOnly(copyById, cfk.committedByExecuteAt, newById, missing, missingCount, cfk.bounds, waitingTxnId);
                        // we can safely use missing[prunedIndex] here because we only fill missing with transactions for which we manage execution
                        if (minUndecided != null)
                            newMinUndecidedById = Arrays.binarySearch(newById, minUndecided);
                        newMaxAppliedPreBootstrapWriteById = advanceByIdIndex(newMaxAppliedPreBootstrapWriteById, byId, Integer.MAX_VALUE, false, missing, missingCount);
                    }
                }
                cachedTxnIds().discard(missing, clearMissingCount);

                CommandsForKey.Unmanaged[] newUnmanaged = cfk.unmanageds;
                if (!readyToApply && mode != REGISTER_DEPS_ONLY)
                {
                    CommandsForKey.Unmanaged newPendingRecord;
                    if (waitingToApply) newPendingRecord = new CommandsForKey.Unmanaged(APPLY, command.txnId(), waitingToExecuteAt);
                    else newPendingRecord = new CommandsForKey.Unmanaged(COMMIT, command.txnId(), txnIds.get(toIndex - 1));

                    if (update != null)
                    {
                        update.add(newPendingRecord);
                        return cfk;
                    }

                    newUnmanaged = SortedArrays.insert(cfk.unmanageds, newPendingRecord, CommandsForKey.Unmanaged[]::new);
                }

                CommandsForKey newCfk;
                if (newById == byId) newCfk = new CommandsForKey(cfk, newLoadingPruned, newUnmanaged);
                else
                {
                    int prunedBeforeById = cfk.prunedBeforeById;
                    Invariants.require(prunedBeforeById < 0 || newById[prunedBeforeById].equals(cfk.prunedBefore()));
                    newCfk = new CommandsForKey(cfk.key(), cfk.bounds, newById, newMinUndecidedById, newMaxAppliedPreBootstrapWriteById, newCommittedByExecuteAt, cfk.maxAppliedWriteByExecuteAt, cfk.maxUniqueHlc, newLoadingPruned, prunedBeforeById, newUnmanaged);
                }

                CommandsForKeyUpdate result = newCfk;
                if (prunedIds != NO_TXNIDS)
                    result = new CommandsForKeyUpdateWithPostProcess(newCfk, new LoadPruned(result.postProcess(), prunedIds));

                if (readyToApply)
                {
                    paranoidCheckExecutesAtLeast(newCfk, waitingTxnId, effectiveExecutesAt, waitingToExecuteAt);
                    result = new CommandsForKeyUpdateWithPostProcess(newCfk, new NotifyNotWaiting(result.postProcess(), new TxnId[] { safeCommand.txnId() }));
                }

                return result;
            }
        }

        return new CommandsForKeyUpdateWithPostProcess(cfk, new NotifyNotWaiting(null, new TxnId[] { safeCommand.txnId() }));
    }