in accord-core/src/main/java/accord/impl/progresslog/WaitingState.java [392:481]
private void runInternal(SafeCommandStore safeStore, SafeCommand safeCommand, DefaultProgressLog owner)
{
BlockedUntil blockedUntil = blockedUntil();
Command command = safeCommand.current();
Invariants.require(!owner.hasActive(Waiting, txnId));
Invariants.require(command.saveStatus().compareTo(blockedUntil.unblockedFrom) < 0,
"Command has met desired criteria (%s) but progress log entry has not been cancelled: %s", blockedUntil.unblockedFrom, command);
set(safeStore, owner, blockedUntil, Querying);
TxnId txnId = safeCommand.txnId();
// first make sure we have enough information to obtain the command locally
Timestamp executeAt = command.executeAtIfKnown();
Participants<?> fetchKeys = Invariants.nonNull(command.maxContactable());
if (!Route.isRoute(fetchKeys))
{
long lowEpoch = updateLowEpoch(safeStore, txnId, command);
long highEpoch = updateHighEpoch(safeStore, txnId, blockedUntil, command, executeAt);
fetchRoute(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, fetchKeys);
return;
}
Route<?> route = Route.castToRoute(fetchKeys);
if (homeSatisfies().compareTo(blockedUntil) < 0)
{
// first wait until the homeKey has progressed to a point where it can answer our query; we don't expect our shards to know until then anyway
awaitHomeKey(owner, blockedUntil, txnId, executeAt, route);
return;
}
long prevLowEpoch = readLowEpoch(safeStore, txnId, route);
long prevHighEpoch = readHighEpoch(safeStore, txnId, route);
long lowEpoch = updateLowEpoch(safeStore, txnId, command);
long highEpoch = updateHighEpoch(safeStore, txnId, blockedUntil, command, executeAt);
// TODO (expected): split into txn and deps sources
Route<?> slicedRoute = slicedRoute(safeStore, txnId, route, lowEpoch, highEpoch);
Invariants.require(!slicedRoute.isEmpty());
if (!command.hasBeen(Status.PreCommitted))
{
// we know it has been decided one way or the other by the home shard at least, so we attempt a fetch
// including the home shard to get us to at least PreCommitted where we can safely wait on individual shards
fetch(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, slicedRoute, slicedRoute.withHomeKey(), route);
return;
}
// the awaitRoute may be only the home shard, if that is sufficient to indicate the fetchRoute will be able to answer our query;
// the fetchRoute may also be only the home shard, if that is sufficient to answer our query (e.g. for executeAt)
Route<?> awaitRoute = awaitRoute(slicedRoute, blockedUntil);
Route<?> fetchRoute = fetchRoute(slicedRoute, awaitRoute, blockedUntil, safeStore, lowEpoch, txnId, highEpoch, route);
if (awaitRoute.isHomeKeyOnlyRoute())
{
// at this point we can switch to polling as we know someone has the relevant state
fetch(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, slicedRoute, fetchRoute, route);
return;
}
int roundSize = awaitRoundSize(awaitRoute);
if (hasNewLowEpoch(safeStore, txnId, prevLowEpoch, lowEpoch) || hasNewHighEpoch(safeStore, txnId, prevHighEpoch, highEpoch))
{
// update round counters because we have changed the epochs involved
Route<?> prevSlicedRoute = slicedRoute(safeStore, txnId, route, prevLowEpoch, prevHighEpoch);
Route<?> prevAwaitRoute = awaitRoute(prevSlicedRoute, blockedUntil);
int prevRoundSize = awaitRoundSize(prevAwaitRoute);
int prevRoundIndex = awaitRoundIndex(prevRoundSize);
int prevRoundStart = prevRoundIndex * prevRoundSize;
int newRoundIndex = -1;
if (prevRoundStart < prevAwaitRoute.size())
newRoundIndex = (int)awaitRoute.findNextSameKindIntersection(0, (Unseekables)prevAwaitRoute, prevRoundStart + prevRoundIndex);
if (newRoundIndex < 0)
newRoundIndex = awaitRoute.size();
updateAwaitRound(newRoundIndex, roundSize);
}
int roundIndex = awaitRoundIndex(roundSize);
int roundStart = roundIndex * roundSize;
if (roundStart >= awaitRoute.size())
{
// all of the shards we are awaiting have been processed and found at least one replica that has the state needed to answer our query
// at this point we can switch to polling as we know someone has the relevant state
fetch(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, slicedRoute, fetchRoute, route);
return;
}
int roundEnd = Math.min(roundStart + roundSize, awaitRoute.size());
awaitRoute = awaitRoute.slice(roundStart, roundEnd);
// TODO (desired): use some mechanism (e.g. random chance or another counter)
// to either periodically fetch the whole remaining route or gradually increase the slice length
awaitSlice(owner, blockedUntil, txnId, executeAt, awaitRoute, (roundIndex << 1) | 1);
}