in accord-core/src/main/java/accord/local/SafeCommandStore.java [395:462]
private static void updateUnmanagedCommandsForKey(SafeCommandStore safeStore, Command next, UpdateUnmanagedMode mode)
{
TxnId txnId = next.txnId();
RoutingKeys keys;
if (!txnId.is(Kind.ExclusiveSyncPoint)) keys = next.asCommitted().waitingOn().keys;
else
{
Command.WaitingOn waitingOn = next.asCommitted().waitingOn;
RedundantBefore redundantBefore = safeStore.redundantBefore();
KeyDeps deps = next.partialDeps().keyDeps;
keys = deps.keys();
SimpleBitSet select = new SimpleBitSet(keys.size());
for (int i = 0 ; i < keys.size() ; ++i)
{
if (waitingOn.isWaitingOnKey(i))
{
select.set(i);
continue;
}
SortedList<TxnId> txnIdsForKey = deps.txnIdsForKeyIndex(i);
RoutingKey key = keys.get(i);
TxnId maxTxnId = txnIdsForKey.get(txnIdsForKey.size() - 1);
// TODO (desired): convert to O(n) merge
RedundantStatus status = redundantBefore.status(maxTxnId, null, key);
if (!status.all(SHARD_APPLIED) || !status.all(LOCALLY_REDUNDANT))
select.set(i);
}
if (select.getSetBitCount() != keys.size())
{
RoutingKey[] array = new RoutingKey[select.getSetBitCount()];
int count = 0;
for (int i = 0 ; i < keys.size() ; ++i)
{
if (select.get(i))
array[count++] = keys.get(i);
}
keys = RoutingKeys.ofSortedUnique(array);
}
}
// TODO (required): use StoreParticipants.executes()
// TODO (required): consider how execution works for transactions that await future deps and where the command store inherits additional keys in execution epoch
PreLoadContext context = PreLoadContext.contextFor(txnId, keys, INCR);
PreLoadContext execute = safeStore.canExecute(context);
// TODO (expected): execute immediately for any keys we already have loaded, and save only those we haven't for later
if (execute != null)
{
updateUnmanagedCommandsForKey(safeStore, execute.keys(), txnId, mode);
}
if (execute == context)
{
if (next.txnId().is(Range))
registerTransitive(safeStore, txnId, next);
}
else
{
if (execute != null)
context = PreLoadContext.contextFor(txnId, keys.without(execute.keys()), INCR);
safeStore = safeStore;
CommandStore unsafeStore = safeStore.commandStore();
AsyncChain<Void> submit = unsafeStore.build(context, safeStore0 -> { updateUnmanagedCommandsForKey(safeStore0, safeStore0.context().keys() , txnId, mode); });
if (next.txnId().is(Range))
submit = submit.flatMap(success -> unsafeStore.build(PreLoadContext.empty(), safeStore0 -> { registerTransitive(safeStore0, txnId, next); }));
submit.begin(safeStore.commandStore().agent);
}
}