static CommandsForKey pruneBefore()

in accord-core/src/main/java/accord/local/cfk/Pruning.java [315:489]


    static CommandsForKey pruneBefore(CommandsForKey cfk, TxnInfo newPrunedBefore, int pos)
    {
        Invariants.requireArgument(newPrunedBefore.compareSimultaneousEpochAndHlc(cfk.prunedBefore()) > 0, "Expect new prunedBefore to have > HLC and >= epoch (%s vs %s)", newPrunedBefore, cfk.prunedBefore());
        Invariants.requireArgument(newPrunedBefore.executeAt.compareSimultaneousEpochAndHlc(cfk.prunedBefore().executeAt) > 0, "Expect new prunedBefore.executeAt to have > HLC and >= epoch (%s vs %s)", newPrunedBefore.executeAt, cfk.prunedBefore().executeAt);
        Invariants.requireArgument(newPrunedBefore.mayExecute());

        TxnInfo[] byId = cfk.byId;
        TxnInfo[] committedByExecuteAt = cfk.committedByExecuteAt;
        int minUndecidedById;
        int retainCount = 0, removedCommittedCount = 0;
        // a store of committed executeAts we have removed where we cannot otherwise cheaply infer it
        Object[] removedExecuteAts = NO_TXNIDS;
        int removedExecuteAtCount = 0;
        Long2ObjectHashMap<TxnInfo> epochPrunedBefores = buildEpochPrunedBefores(byId, committedByExecuteAt, newPrunedBefore);
        TxnInfo[] newById;
        {
            minUndecidedById = cfk.minUndecidedById;
            int minUndecidedByIdDelta = 0;
            RecursiveObjectBuffers<TxnId> missingBuffers = new RecursiveObjectBuffers<>(cachedTxnIds());
            TxnId[] mergedMissing = newPrunedBefore.missing();
            int mergedMissingCount = mergedMissing.length;
            TxnInfo activePruneEpochBefore = newPrunedBefore; // note that if activePrunedBefore != newPrunedBefore, we compare with executeAt
            long activePruneEpoch = activePruneEpochBefore.epoch();
            Object[] newByIdBuffer = cachedAny().get(pos);

            for (int i = pos - 1 ; i >= 0 ; --i)
            {
                TxnInfo txn = byId[i];
                switch (txn.status())
                {
                    default: throw new UnhandledEnum(txn.status());
                    case COMMITTED:
                    case STABLE:
                        if (txn.mayExecute())
                        {
                            newByIdBuffer[pos - ++retainCount] = txn;
                        }
                        else
                        {
                            long epoch = txn.epoch();
                            if (epoch != activePruneEpoch && epochPrunedBefores != null)
                            {
                                activePruneEpochBefore = epochPrunedBefores.get(epoch);
                                activePruneEpoch = epoch;
                            }

                            if (activePruneEpochBefore == txn || (activePruneEpochBefore == newPrunedBefore && activePruneEpochBefore.executeAt.compareTo(txn.executeAt) <= 0))
                                newByIdBuffer[pos - ++retainCount] = txn;
                        }
                        break;

                    case TRANSITIVE:
                    case TRANSITIVE_VISIBLE:
                    case PREACCEPTED_WITHOUT_DEPS:
                    case PREACCEPTED_WITH_DEPS:
                    case PREACCEPTED_COORD_NO_FAST_COMMIT:
                    case NOTACCEPTED:
                    case ACCEPTED:
                        newByIdBuffer[pos - ++retainCount] = txn;
                        if (i == minUndecidedById)
                            minUndecidedByIdDelta = retainCount;
                        break;

                    case APPLIED_NOT_DURABLE:
                    case APPLIED_DURABLE:
                    case APPLIED_NOT_EXECUTED:
                        long epoch = txn.epoch();
                        if (epoch != activePruneEpoch && epochPrunedBefores != null)
                        {
                            activePruneEpochBefore = epochPrunedBefores.get(epoch);
                            activePruneEpoch = epoch;
                        }

                        boolean tryPrune = activePruneEpochBefore != null
                                           && txn.executeAt.compareTo(activePruneEpochBefore.executeAt) < 0
                                           && txn.compareTo(activePruneEpochBefore) < 0;
                        if (tryPrune)
                        {
                            TxnId[] missing = txn.missing();
                            if (missing == NO_TXNIDS || SortedArrays.isSubset(missing, 0, missing.length, mergedMissing, 0, mergedMissingCount))
                            {
                                if (txn.mayExecute())
                                {   // if we don't execute, we don't track in committedByExecuteAt, so don't need to update bookkeeping for removing from there
                                    if (missing != NO_TXNIDS)
                                    {
                                        if (removedExecuteAtCount == removedExecuteAts.length)
                                            removedExecuteAts = cachedAny().resize(removedExecuteAts, removedExecuteAtCount, Math.max(8, removedExecuteAtCount + (removedExecuteAtCount >> 1)));
                                        removedExecuteAts[removedExecuteAtCount++] = txn.executeAt;
                                    }
                                    ++removedCommittedCount;
                                }
                                continue;
                            }

                            if (txn.executeAt == txn)
                            {
                                mergedMissing = SortedArrays.linearUnion(missing, missing.length, mergedMissing, mergedMissingCount, missingBuffers);
                                mergedMissingCount = missingBuffers.sizeOfLast(mergedMissing);
                            }
                        }
                        newByIdBuffer[pos - ++retainCount] = txn;

                    case INVALIDATED:
                    case PRUNED:
                    case ERASED:
                        break;
                }
            }

            if (pos == retainCount)
                return cfk;

            int removedByIdCount = pos - retainCount;
            if (minUndecidedById >= 0)
            {
                if (minUndecidedById >= pos)
                    minUndecidedById -= removedByIdCount;
                else
                    minUndecidedById = retainCount - minUndecidedByIdDelta;
            }
            newById = new TxnInfo[byId.length - removedByIdCount];
            System.arraycopy(newByIdBuffer, pos - retainCount, newById, 0, retainCount);
            System.arraycopy(byId, pos, newById, retainCount, byId.length - pos);
            missingBuffers.discardBuffers();
        }

        TxnInfo[] newCommittedByExecuteAt;
        {   // copy to new committedByExecuteAt array
            Arrays.sort(removedExecuteAts, 0, removedExecuteAtCount);
            newCommittedByExecuteAt = new TxnInfo[committedByExecuteAt.length - removedCommittedCount];
            int sourcePos = Arrays.binarySearch(committedByExecuteAt, newPrunedBefore, TxnInfo::compareExecuteAt);
            int insertPos = sourcePos - removedCommittedCount;
            int removedExecuteAtPos = removedExecuteAtCount - 1;
            TxnInfo activePruneEpochBefore = newPrunedBefore; // note that if activePrunedBefore != newPrunedBefore, we compare with executeAt
            long activePruneEpoch = newPrunedBefore.epoch();
            for (int i = sourcePos - 1; i >= 0 ; --i)
            {
                TxnInfo txn = committedByExecuteAt[i];
                if (txn.is(APPLIED))
                {
                    long epoch = txn.epoch();
                    if (epoch != activePruneEpoch && epochPrunedBefores != null)
                    {
                        activePruneEpochBefore = epochPrunedBefores.get(epoch);
                        activePruneEpoch = epoch;
                    }

                    boolean tryPrune = activePruneEpochBefore != null
                                       && txn.compareTo(activePruneEpochBefore) < 0
                                       && txn.executeAt.compareTo(activePruneEpochBefore.executeAt) < 0;
                    if (tryPrune)
                    {
                        TxnId[] missing = txn.missing();
                        if (missing == NO_TXNIDS)
                            continue;

                        if (removedExecuteAtPos >= 0 && removedExecuteAts[removedExecuteAtPos] == txn.executeAt)
                        {
                            --removedExecuteAtPos;
                            continue;
                        }
                    }
                }

                newCommittedByExecuteAt[--insertPos] = txn;
            }
            System.arraycopy(committedByExecuteAt, sourcePos, newCommittedByExecuteAt, sourcePos - removedCommittedCount, committedByExecuteAt.length - sourcePos);
        }

        cachedAny().forceDiscard(removedExecuteAts, removedExecuteAtCount);
        int newMaxAppliedWriteByExecuteAt = cfk.maxAppliedWriteByExecuteAt - removedCommittedCount;
        int newMaxAppliedPreBootstrapWriteById = recomputeMaxAppliedPreBootstrapWriteById(cfk.bounds, newById, cfk.maxAppliedPreBootstrapWriteById);
        Invariants.require(newById[retainCount] == newPrunedBefore);
        return new CommandsForKey(cfk.key, cfk.bounds, newById, minUndecidedById, newMaxAppliedPreBootstrapWriteById, newCommittedByExecuteAt, newMaxAppliedWriteByExecuteAt, cfk.maxUniqueHlc, cfk.loadingPruned, retainCount, cfk.unmanageds);
    }