in accord-core/src/main/java/accord/local/cfk/Updating.java [878:1133]
static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, SafeCommand safeCommand, UpdateUnmanagedMode mode, @Nullable List<CommandsForKey.Unmanaged> update)
{
boolean register = mode != UPDATE;
Invariants.requireArgument(mode == UPDATE || update == null);
if (safeCommand.current().hasBeen(Status.Truncated))
return cfk;
Command orig = safeCommand.current();
if (orig.saveStatus() == Uninitialised && orig.txnId().is(EphemeralRead))
return cfk;
Command.Committed command = safeCommand.current().asCommitted();
TxnId waitingTxnId = command.txnId();
// used only to decide if an executeAt is included _on the assumption the TxnId is_. For ?[EX] this is all timestamps
Timestamp compareExecuteAt = waitingTxnId.awaitsOnlyDeps() ? Timestamp.MAX : command.executeAt();
TxnInfo[] byId = cfk.byId, newById = null;
RelationMultiMap.SortedRelationList<TxnId> txnIds = command.partialDeps().keyDeps.txnIdsWithFlags(cfk.key());
TxnId[] missing = NO_TXNIDS;
int missingCount = 0;
// we only populate dependencies to facilitate execution, not for any distributed decision,
// so we can filter to only transactions we need to execute locally
int i = txnIds.find(cfk.redundantOrBootstrappedBefore());
if (i < 0) i = -1 - i;
int waitingFromIndex = i; // the min input index we expect to execute
if (waitingTxnId.is(ExclusiveSyncPoint) && waitingTxnId.is(Range) && mode == REGISTER)
{
// for RX we register all our transitive dependencies to make sure we can answer coordinated dependency calculations
// in this case we separate out the position from which we insert missing txnId and where we compute readiness to execute
i = txnIds.find(cfk.redundantBefore());
if (i < 0) i = -1 - i;
}
// note that while we may directly insert transactions that are for a future epoch we don't own,
// we don't do this for dependencies as we only want to know these transactions for answering
// recovery decisions about transactions we do own
// We make sure to compute dependencies that satisfy all participating epochs for RX/KX
int toIndex = txnIds.size();
while (toIndex > 0 && txnIds.get(toIndex - 1).epoch() >= cfk.bounds.endEpoch)
--toIndex;
if (i < toIndex)
{
// we may have dependencies that execute after us, but in this case we only wait for them to (transitively) commit
// so we distinguish the input index we need to execute transactions to from the index we need to commit to
int waitingToExecuteIndex = toIndex;
if (waitingToExecuteIndex > waitingFromIndex)
{
TxnId maxWaiting = txnIds.get(waitingToExecuteIndex - 1);
Timestamp includeBefore = waitingTxnId.is(EphemeralRead) ? Timestamp.MAX : command.executeAt();
if (maxWaiting.compareTo(includeBefore) >= 0)
{
waitingToExecuteIndex = txnIds.find(includeBefore);
if (waitingToExecuteIndex < 0)
waitingToExecuteIndex = -1 - waitingToExecuteIndex;
}
}
boolean readyToApply = true; // our dependencies have applied, so we are ready to apply
boolean waitingToApply = true; // our dependencies have committed, so we know when we execute and are waiting
boolean hasFutureDependency = toIndex > waitingFromIndex
&& !waitingTxnId.is(EphemeralRead) // ephemeral reads wait for everything they witness, regardless of TxnId or executeAt, so the latest dependency is always enough
&& txnIds.get(toIndex - 1).compareTo(command.executeAt()) > 0;
if (hasFutureDependency)
{
// This logic is to handle the case where we have pruned dependencies on the replicas we have used to calculate our dependencies
// so we may be missing an execution dependency, and we require that we are transitively committed
int waitingOnCommit = Arrays.binarySearch(byId, txnIds.get(toIndex - 1));
if (waitingOnCommit < 0) waitingOnCommit = -1 - waitingOnCommit;
if (cfk.minUndecidedById >= 0 && cfk.minUndecidedById < waitingOnCommit)
readyToApply = waitingToApply = false;
}
Timestamp waitingToExecuteAt = null; // when the CFK should notify not waiting
Timestamp effectiveExecutesAt = null; // the executeAt we should report to WaitingOn for ephemeral reads
int j = SortedArrays.binarySearch(byId, 0, byId.length, txnIds.get(i), Timestamp::compareTo, FAST);
if (j < 0) j = -1 -j;
while (i < toIndex)
{
int c = j == byId.length ? -1 : txnIds.get(i).compareTo(byId[j]);
if (c >= 0)
{
TxnInfo txn = byId[j];
if (c == 0 || txn.witnessedBy(waitingTxnId))
{
if (i >= waitingFromIndex && waitingToApply)
{
if (txn.mayExecute())
{
if (txn.compareTo(COMMITTED) < 0) waitingToApply = readyToApply = false;
else if (i < waitingToExecuteIndex)
{
if (txn.compareTo(APPLIED) < 0 && txn.executeAt.compareTo(compareExecuteAt) < 0)
{
readyToApply = false;
waitingToExecuteAt = Timestamp.nonNullOrMax(waitingToExecuteAt, txn.executeAt);
}
else if (txn.is(APPLIED))
{
effectiveExecutesAt = Timestamp.nonNullOrMax(effectiveExecutesAt, txn.executeAt);
}
}
}
else if (waitingTxnId.awaitsOnlyDeps())
{
// only really need to track epoch, but track max executeAt to support retryInLatestEpoch
effectiveExecutesAt = Timestamp.nonNullOrMax(effectiveExecutesAt, txn.executeAt);
}
}
if (c == 0)
{
if (txn.is(TRANSITIVE) && isTransitiveDependencyVisible(waitingTxnId))
{
if (newById == null)
newById = byId.clone();
newById[j] = TxnInfo.create(txn, TRANSITIVE_VISIBLE, txn.mayExecute(), txn.statusOverrides(), txn.executeAt, txn.missing(), txn.ballot());
}
++i;
}
}
++j;
}
else if (register)
{
TxnId insert = txnIds.get(i);
if (i >= waitingFromIndex)
readyToApply = waitingToApply = false;
if (missingCount == missing.length)
missing = cachedTxnIds().resize(missing, missingCount, Math.max(8, missingCount + missingCount/2));
missing[missingCount++] = insert;
++i;
}
else
{
Invariants.require(txnIds.get(i++).compareTo(cfk.prunedBefore()) < 0);
}
}
if (toIndex < txnIds.size() && waitingTxnId.awaitsOnlyDeps())
effectiveExecutesAt = Timestamp.nonNullOrMax(effectiveExecutesAt, txnIds.get(txnIds.size() - 1));
// TODO (required): document why we can restrict this test to sync points
if (waitingToApply && waitingTxnId.isSyncPoint() && Pruning.isAnyPredecessorWaitingOnPruned(cfk.loadingPruned, waitingTxnId))
readyToApply = waitingToApply = false;
if (waitingToApply && hasFutureDependency)
{
// This logic is to handle the case where we have pruned dependencies on the replicas we have used to calculate our dependencies
// so we may be missing an execution dependency, and we require that we are transitively committed
int w = Arrays.binarySearch(byId, command.executeAt());
if (w < 0) w = -1 - w;
// TODO (desired): consider moving this logic inline to the main loop body
while (--w >= 0)
{
TxnInfo txn = byId[w];
if (!txn.mayExecute()
|| !txn.witnessedBy(waitingTxnId)
|| txn.isAtLeast(APPLIED))
continue;
Invariants.require(txn.compareTo(COMMITTED) >= 0);
if (txn.executeAt.compareTo(compareExecuteAt) >= 0)
continue;
readyToApply = false;
waitingToExecuteAt = Timestamp.nonNullOrMax(waitingToExecuteAt, txn.executeAt);
}
}
waitingToExecuteAt = updateExecuteAtLeast(waitingToExecuteAt, effectiveExecutesAt, safeCommand);
if (!readyToApply || missingCount > 0 || newById != null)
{
if (newById == null) newById = byId;
TxnInfo[] newCommittedByExecuteAt = cfk.committedByExecuteAt;
int newMinUndecidedById = cfk.minUndecidedById;
int newMaxAppliedPreBootstrapWriteById = cfk.maxAppliedPreBootstrapWriteById;
Object[] newLoadingPruned = cfk.loadingPruned;
TxnId[] prunedIds = NO_TXNIDS;
int clearMissingCount = missingCount;
if (missingCount > 0)
{
int prunedIndex = Arrays.binarySearch(missing, 0, missingCount, cfk.prunedBefore());
if (prunedIndex < 0) prunedIndex = -1 - prunedIndex;
if (prunedIndex > 0)
{
prunedIds = Arrays.copyOf(missing, prunedIndex);
List<TxnId> insertLoadPruned = new ArrayList<>();
newLoadingPruned = Pruning.loadPruned(cfk.loadingPruned, prunedIds, waitingTxnId, insertLoadPruned);
prunedIds = insertLoadPruned.isEmpty() ? NO_TXNIDS : insertLoadPruned.toArray(TxnId[]::new);
}
if (prunedIndex != missingCount)
{
missingCount -= prunedIndex;
System.arraycopy(missing, prunedIndex, missing, 0, missingCount);
removeNonIdentityFlags(missing, missingCount);
int minUndecidedMissingIndex = 0;
while (minUndecidedMissingIndex < missingCount && !cfk.mayExecute(missing[minUndecidedMissingIndex]))
++minUndecidedMissingIndex;
TxnId minUndecidedMissing = minUndecidedMissingIndex == missingCount ? null : missing[minUndecidedMissingIndex];
TxnId minUndecided = TxnId.nonNullOrMin(minUndecidedMissing, cfk.minUndecided());
TxnInfo[] copyById = newById;
newById = new TxnInfo[byId.length + missingCount];
newCommittedByExecuteAt = insertAdditionsOnly(copyById, cfk.committedByExecuteAt, newById, missing, missingCount, cfk.bounds, waitingTxnId);
// we can safely use missing[prunedIndex] here because we only fill missing with transactions for which we manage execution
if (minUndecided != null)
newMinUndecidedById = Arrays.binarySearch(newById, minUndecided);
newMaxAppliedPreBootstrapWriteById = advanceByIdIndex(newMaxAppliedPreBootstrapWriteById, byId, Integer.MAX_VALUE, false, missing, missingCount);
}
}
cachedTxnIds().discard(missing, clearMissingCount);
CommandsForKey.Unmanaged[] newUnmanaged = cfk.unmanageds;
if (!readyToApply && mode != REGISTER_DEPS_ONLY)
{
CommandsForKey.Unmanaged newPendingRecord;
if (waitingToApply) newPendingRecord = new CommandsForKey.Unmanaged(APPLY, command.txnId(), waitingToExecuteAt);
else newPendingRecord = new CommandsForKey.Unmanaged(COMMIT, command.txnId(), txnIds.get(toIndex - 1));
if (update != null)
{
update.add(newPendingRecord);
return cfk;
}
newUnmanaged = SortedArrays.insert(cfk.unmanageds, newPendingRecord, CommandsForKey.Unmanaged[]::new);
}
CommandsForKey newCfk;
if (newById == byId) newCfk = new CommandsForKey(cfk, newLoadingPruned, newUnmanaged);
else
{
int prunedBeforeById = cfk.prunedBeforeById;
Invariants.require(prunedBeforeById < 0 || newById[prunedBeforeById].equals(cfk.prunedBefore()));
newCfk = new CommandsForKey(cfk.key(), cfk.bounds, newById, newMinUndecidedById, newMaxAppliedPreBootstrapWriteById, newCommittedByExecuteAt, cfk.maxAppliedWriteByExecuteAt, cfk.maxUniqueHlc, newLoadingPruned, prunedBeforeById, newUnmanaged);
}
CommandsForKeyUpdate result = newCfk;
if (prunedIds != NO_TXNIDS)
result = new CommandsForKeyUpdateWithPostProcess(newCfk, new LoadPruned(result.postProcess(), prunedIds));
if (readyToApply)
{
paranoidCheckExecutesAtLeast(newCfk, waitingTxnId, effectiveExecutesAt, waitingToExecuteAt);
result = new CommandsForKeyUpdateWithPostProcess(newCfk, new NotifyNotWaiting(result.postProcess(), new TxnId[] { safeCommand.txnId() }));
}
return result;
}
}
return new CommandsForKeyUpdateWithPostProcess(cfk, new NotifyNotWaiting(null, new TxnId[] { safeCommand.txnId() }));
}