in src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java [323:383]
public void close() {
List<Future<?>> threadFutures = new ArrayList<>();
lock.lock();
if (closed) {
lock.unlock();
return;
}
// Do not close while runUntilAllBlocked executes.
// closeRequested tells it to call close() at the end.
closeRequested = true;
if (inRunUntilAllBlocked) {
lock.unlock();
return;
}
try {
for (WorkflowThread c : threadsToAdd) {
threads.addLast(c);
}
threadsToAdd.clear();
for (WorkflowThread c : threads) {
threadFutures.add(c.stopNow());
}
threads.clear();
// We cannot use an iterator to unregister failed Promises since f.get()
// will remove the promise directly from failedPromises. This causes an
// ConcurrentModificationException
// For this reason we will loop over a copy of failedPromises.
Set<Promise> failedPromisesLoop = new HashSet<>(failedPromises);
for (Promise f : failedPromisesLoop) {
if (!f.isCompleted()) {
throw new Error("expected failed");
}
try {
f.get();
throw new Error("unreachable");
} catch (RuntimeException e) {
log.warn(
"Promise that was completedExceptionally was never accessed. "
+ "The ignored exception:",
CheckedExceptionWrapper.unwrap(e));
}
}
} finally {
closed = true;
lock.unlock();
}
// Context is destroyed in c.StopNow(). Wait on all tasks outside the lock since
// these tasks use the same lock to execute.
for (Future<?> future : threadFutures) {
try {
future.get();
} catch (InterruptedException e) {
throw new Error("Unexpected interrupt", e);
} catch (ExecutionException e) {
throw new Error("Unexpected failure stopping coroutine", e);
}
}
}