public void runUntilAllBlocked()

in src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java [204:291]


  public void runUntilAllBlocked() throws Throwable {
    lock.lock();
    try {
      checkClosed();

      inRunUntilAllBlocked = true;
      Throwable unhandledException = null;
      // Keep repeating until at least one of the threads makes progress.
      boolean progress;
      outerLoop:
      do {
        threadsToAdd.clear();

        if (!toExecuteInWorkflowThread.isEmpty()) {
          List<WorkflowThread> callbackThreads = new ArrayList<>(toExecuteInWorkflowThread.size());
          for (NamedRunnable nr : toExecuteInWorkflowThread) {
            WorkflowThread thread =
                new WorkflowThreadImpl(
                    false,
                    threadPool,
                    this,
                    nr.name,
                    false,
                    runnerCancellationScope,
                    nr.runnable,
                    cache,
                    getContextPropagators(),
                    getPropagatedContexts());
            callbackThreads.add(thread);
          }

          // It is important to prepend threads as there are callbacks
          // like signals that have to run before any other threads.
          // Otherwise signal might be never processed if it was received
          // after workflow decided to close.
          // Adding the callbacks in the same order as they appear in history.

          for (int i = callbackThreads.size() - 1; i >= 0; i--) {
            threads.addFirst(callbackThreads.get(i));
          }
        }

        toExecuteInWorkflowThread.clear();
        progress = false;
        Iterator<WorkflowThread> ci = threads.iterator();
        nextWakeUpTime = Long.MAX_VALUE;
        while (ci.hasNext()) {
          WorkflowThread c = ci.next();
          progress = c.runUntilBlocked() || progress;
          if (exitRequested) {
            close();
            break outerLoop;
          }
          if (c.isDone()) {
            ci.remove();
            if (c.getUnhandledException() != null) {
              unhandledException = c.getUnhandledException();
              break;
            }
          } else {
            long t = c.getBlockedUntil();
            if (t > currentTimeMillis() && t < nextWakeUpTime) {
              nextWakeUpTime = t;
            }
          }
        }
        if (unhandledException != null) {
          close();
          throw unhandledException;
        }
        for (WorkflowThread c : threadsToAdd) {
          threads.addLast(c);
        }
      } while (progress && !threads.isEmpty());

      if (nextWakeUpTime < currentTimeMillis() || nextWakeUpTime == Long.MAX_VALUE) {
        nextWakeUpTime = 0;
      }

    } finally {
      inRunUntilAllBlocked = false;
      // Close was requested while running
      if (closeRequested) {
        close();
      }
      lock.unlock();
    }
  }