in src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java [204:291]
public void runUntilAllBlocked() throws Throwable {
lock.lock();
try {
checkClosed();
inRunUntilAllBlocked = true;
Throwable unhandledException = null;
// Keep repeating until at least one of the threads makes progress.
boolean progress;
outerLoop:
do {
threadsToAdd.clear();
if (!toExecuteInWorkflowThread.isEmpty()) {
List<WorkflowThread> callbackThreads = new ArrayList<>(toExecuteInWorkflowThread.size());
for (NamedRunnable nr : toExecuteInWorkflowThread) {
WorkflowThread thread =
new WorkflowThreadImpl(
false,
threadPool,
this,
nr.name,
false,
runnerCancellationScope,
nr.runnable,
cache,
getContextPropagators(),
getPropagatedContexts());
callbackThreads.add(thread);
}
// It is important to prepend threads as there are callbacks
// like signals that have to run before any other threads.
// Otherwise signal might be never processed if it was received
// after workflow decided to close.
// Adding the callbacks in the same order as they appear in history.
for (int i = callbackThreads.size() - 1; i >= 0; i--) {
threads.addFirst(callbackThreads.get(i));
}
}
toExecuteInWorkflowThread.clear();
progress = false;
Iterator<WorkflowThread> ci = threads.iterator();
nextWakeUpTime = Long.MAX_VALUE;
while (ci.hasNext()) {
WorkflowThread c = ci.next();
progress = c.runUntilBlocked() || progress;
if (exitRequested) {
close();
break outerLoop;
}
if (c.isDone()) {
ci.remove();
if (c.getUnhandledException() != null) {
unhandledException = c.getUnhandledException();
break;
}
} else {
long t = c.getBlockedUntil();
if (t > currentTimeMillis() && t < nextWakeUpTime) {
nextWakeUpTime = t;
}
}
}
if (unhandledException != null) {
close();
throw unhandledException;
}
for (WorkflowThread c : threadsToAdd) {
threads.addLast(c);
}
} while (progress && !threads.isEmpty());
if (nextWakeUpTime < currentTimeMillis() || nextWakeUpTime == Long.MAX_VALUE) {
nextWakeUpTime = 0;
}
} finally {
inRunUntilAllBlocked = false;
// Close was requested while running
if (closeRequested) {
close();
}
lock.unlock();
}
}