in accord-core/src/main/java/accord/impl/progresslog/WaitingState.java [484:611]
static void awaitOrFetchCallback(CallbackKind kind, SafeCommandStore safeStore, SafeCommand safeCommand, DefaultProgressLog owner, TxnId txnId, BlockedUntil blockedUntil, Unseekables<?> ready, @Nullable Unseekables<?> notReady, @Nullable BlockedUntil upgrade, Throwable fail)
{
WaitingState state = owner.get(txnId);
Invariants.require(state != null, "State has been cleared but callback was not cancelled");
Invariants.require(state.waitingProgress() == Querying);
Invariants.require(state.blockedUntil() == blockedUntil);
Command command = safeCommand.current();
Route<?> route = command.route();
if (fail == null)
{
if (route == null)
{
Invariants.require(kind == FetchRoute);
state.retry(safeStore, safeCommand, owner, blockedUntil);
return;
}
if (ready.contains(route.homeKey()) && blockedUntil.compareTo(state.homeSatisfies()) > 0)
{
// TODO (expected): we can introduce an additional home state check that waits until DURABLE for execution;
// at this point it would even be redundant to wait for each separate shard for the WaitingState? Freeing up bits and simplifying.
BlockedUntil newHomeSatisfies = blockedUntil;
if (upgrade != null && upgrade.compareTo(newHomeSatisfies) > 0)
newHomeSatisfies = upgrade;
state.setHomeSatisfies(newHomeSatisfies);
}
long fromLocalEpoch = state.readLowEpoch(safeStore, txnId, route);
long toLocalEpoch = state.readHighEpoch(safeStore, txnId, route);
Route<?> slicedRoute = slicedRoute(safeStore, txnId, route, fromLocalEpoch, toLocalEpoch); // the actual local keys we care about
Route<?> awaitRoute = awaitRoute(slicedRoute, blockedUntil); // either slicedRoute or just the home key
int roundSize = awaitRoundSize(awaitRoute);
int roundIndex = state.awaitRoundIndex(roundSize);
int roundStart = roundIndex * roundSize;
switch (kind)
{
default: throw new UnhandledEnum(kind);
case AwaitHome:
if (notReady == null)
{
// the home shard was found to already have the necessary state, with no distributed await;
// we can immediately progress the state machine
Invariants.require(0 == state.awaitRoundIndex(roundSize));
Invariants.require(0 == state.awaitBitSet(roundSize));
state.runInternal(safeStore, safeCommand, owner);
}
else
{
// the home shard is not ready to answer our query, but we have registered our remote callback so can wait for it to contact us
state.set(safeStore, owner, blockedUntil, Awaiting);
}
break;
case AwaitSlice:
Invariants.require(awaitRoute == slicedRoute);
// In a production system it is safe for the roundIndex to get corrupted as we will just start polling a bit early,
// but for testing we would like to know it has happened.
if (Invariants.expect(roundStart < roundSize))
{
if (notReady == null)
{
Invariants.expect((int) awaitRoute.findNextSameKindIntersection(roundStart, (Unseekables) ready, 0) / roundSize == roundIndex);
// TODO (desired): in this case perhaps upgrade to fetch for next round?
state.updateAwaitRound(roundIndex + 1, roundSize);
state.runInternal(safeStore, safeCommand, owner);
}
else
{
Invariants.expect((int) awaitRoute.findNextSameKindIntersection(roundStart, (Unseekables) notReady, 0) / roundSize == roundIndex);
// TODO (desired): would be nice to validate this is 0 in cases where we are starting a fresh round
// but have to be careful as cannot zero when we restart as we may have an async callback arrive while we're waiting that then advances state machine
state.initialiseAwaitBitSet(awaitRoute, notReady, roundIndex, roundSize);
state.set(safeStore, owner, blockedUntil, Awaiting);
}
break;
}
case FetchRoute:
if (state.homeSatisfies().compareTo(blockedUntil) < 0)
{
state.runInternal(safeStore, safeCommand, owner);
return;
}
// we may not have requested everything since we didn't have a Route, so calculate our own notReady and fall-through
notReady = slicedRoute.without(ready);
case Fetch:
{
Invariants.require(notReady != null, "Fetch was successful for all keys, but the WaitingState has not been cleared");
Invariants.require(notReady.intersects(slicedRoute), "Fetch was successful for all keys, but the WaitingState has not been cleared");
int nextIndex;
if (roundStart >= awaitRoute.size()) nextIndex = -1;
else if (slicedRoute == awaitRoute) nextIndex = (int) awaitRoute.findNextSameKindIntersection(roundStart, (Unseekables) notReady, 0);
else
{
Invariants.require(roundIndex == 0);
nextIndex = 0;
}
if (nextIndex < 0)
{
// we don't think we have anything to wait for, but we have encountered some notReady responses; queue up a retry
state.setAwaitDone(roundSize);
state.retry(safeStore, safeCommand, owner, blockedUntil);
}
else
{
Invariants.require(nextIndex >= roundStart);
roundIndex = nextIndex / roundSize;
state.updateAwaitRound(roundIndex, roundSize);
state.initialiseAwaitBitSet(awaitRoute, notReady, roundIndex, roundSize);
state.runInternal(safeStore, safeCommand, owner);
}
}
}
}
else
{
safeStore.agent().onCaughtException(fail, "Failed fetching data for " + state);
state.retry(safeStore, safeCommand, owner, blockedUntil);
}
}