in accord-core/src/main/java/accord/messages/Propagate.java [183:318]
public Void apply(SafeCommandStore safeStore)
{
long executeAtEpoch = committedExecuteAt == null ? txnId.epoch() : committedExecuteAt.epoch();
StoreParticipants participants = StoreParticipants.update(safeStore, route, lowEpoch, txnId, executeAtEpoch, highEpoch, committedExecuteAt != null);
// TODO (expected): can we come up with a better more universal pattern for avoiding updating a command we don't intersect with?
// ideally integrated with safeStore.get()
if (participants.owns().isEmpty() && safeStore.ifInitialised(txnId) == null)
return null;
SafeCommand safeCommand = safeStore.get(txnId, participants);
Command command = safeCommand.current();
Timestamp executeAtIfKnown = command.executeAtIfKnown(committedExecuteAt);
if (participants.executes() == null && executeAtIfKnown != null)
{
executeAtEpoch = executeAtIfKnown.epoch();
participants = StoreParticipants.update(safeStore, route, lowEpoch, txnId, executeAtEpoch, highEpoch, true);
}
switch (command.saveStatus().phase)
{
// Already know the outcome, waiting on durability so maybe update with new durability information which can also trigger cleanup
case Persist: return updateDurability(safeStore, safeCommand, participants);
case Cleanup:
case Invalidate:
return null;
}
participants = participants.supplement(command.participants())
.filter(UPDATE, safeStore, txnId, executeAtIfKnown);
Known found = known.knownFor(participants.stillOwns(), participants.stillTouches());
Known currentlyKnown = command.known();
if (!currentlyKnown.has(FullRoute) && Route.isFullRoute(command.route()))
currentlyKnown = currentlyKnown.with(FullRoute);
PartialTxn partialTxn = null;
if (found.hasDefinition())
partialTxn = this.partialTxn.intersecting(participants.stillOwns(), true).reconstitutePartial(participants.stillOwns());
PartialDeps stableDeps = null;
if (found.hasDecidedDeps())
stableDeps = this.stableDeps.intersecting(participants.stillTouches()).reconstitutePartial(participants.stillTouches());
boolean isShardTruncated = withQuorum == HasQuorum && known.hasAnyFullyTruncated(participants.stillTouches());
if (isShardTruncated)
{
found = tryUpgradeTruncated(safeStore, safeCommand, participants, command, executeAtIfKnown, found);
if (found == null)
{
// TODO (expected): should be ownsOrExecutes()?
updateFetchResult(Nothing, participants.owns());
return null;
}
if (command.known().is(DepsKnown))
{
// keep the deps we already have
participants = command.participants().supplement(participants);
}
else
{
Participants<?> depsNeeds = participants.stillTouches();
if (found.hasDecidedDeps() && stableDeps == null && this.stableDeps != null)
{
Invariants.require(executeAtIfKnown != null);
// we don't subtract existing partialDeps, as they cannot be committed deps; we only permit committing deps covering all participating ranges
stableDeps = this.stableDeps.intersecting(depsNeeds).reconstitutePartial(depsNeeds);
}
}
Participants<?> txnNeeds = participants.stillOwnsOrWaitsOn(txnId);
if (found.isDefinitionKnown() && partialTxn == null && this.partialTxn != null)
{
PartialTxn existing = command.partialTxn();
Participants<?> neededExtra = txnNeeds;
if (existing != null) neededExtra = neededExtra.without(existing.keys().toParticipants());
partialTxn = this.partialTxn.intersecting(neededExtra, true).reconstitutePartial(neededExtra);
}
}
SaveStatus propagate = found.atLeast(currentlyKnown).propagatesSaveStatus();
if (propagate.known.isSatisfiedBy(currentlyKnown))
{
updateFetchResult(found, participants.owns());
return updateDurability(safeStore, safeCommand, participants);
}
switch (propagate.status)
{
default: throw illegalState("Unexpected status: " + propagate);
case Truncated: throw illegalState("Status expected to be handled elsewhere: " + propagate);
case AcceptedMedium:
case AcceptedSlow:
case AcceptedInvalidate:
// we never "propagate" accepted statuses as these are essentially votes,
// and contribute nothing to our local state machine
throw illegalState("Invalid states to propagate: " + propagate);
case Invalidated:
Commands.commitInvalidate(safeStore, safeCommand, route);
break;
case Applied:
case PreApplied:
Invariants.require(committedExecuteAt != null);
// we must use the remote executeAt, as it might have a uniqueHlc we aren't aware of at commit
confirm(Commands.apply(safeStore, safeCommand, participants, txnId, route, committedExecuteAt, stableDeps, partialTxn, writes, result));
break;
case Stable:
confirm(Commands.commit(safeStore, safeCommand, participants, Stable, acceptedOrCommitted, txnId, route, partialTxn, executeAtIfKnown, stableDeps, null));
break;
case Committed:
// TODO (expected): we can propagate Committed as Stable if we have any other Stable result AND a quorum of committedDeps
case PreCommitted:
confirm(Commands.precommit(safeStore, safeCommand, participants, txnId, executeAtIfKnown, promised));
// TODO (desired): would it be clearer to yield a SaveStatus so we can have PreCommittedWithDefinition
if (!found.definition().isKnown())
break;
case PreAccepted:
// only preaccept if we coordinate the transaction
if (safeStore.ranges().coordinates(txnId).intersects(route) && Route.isFullRoute(route))
Commands.preaccept(safeStore, safeCommand, participants, txnId, partialTxn, null, false);
case NotDefined:
if (invalidIf == IfUncommitted)
safeStore.progressLog().invalidIfUncommitted(txnId);
break;
}
updateFetchResult(found.propagates(), participants.owns());
return updateDurability(safeStore, safeCommand, participants);
}