private void execProcedure()

in hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java [1806:1953]


  private void execProcedure(RootProcedureState<TEnvironment> procStack,
    Procedure<TEnvironment> procedure) {
    Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
      "NOT RUNNABLE! " + procedure.toString());

    // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
    // The exception is caught below and then we hurry to the exit without disturbing state. The
    // idea is that the processing of this procedure will be unsuspended later by an external event
    // such the report of a region open.
    boolean suspended = false;

    // Whether to 're-' -execute; run through the loop again.
    boolean reExecute = false;

    Procedure<TEnvironment>[] subprocs = null;
    do {
      reExecute = false;
      procedure.resetPersistence();
      try {
        procedure.beforeExec(getEnvironment());
        subprocs = procedure.doExecute(getEnvironment());
        if (subprocs != null && subprocs.length == 0) {
          subprocs = null;
        }
      } catch (ProcedureSuspendedException e) {
        LOG.trace("Suspend {}", procedure);
        suspended = true;
      } catch (ProcedureYieldException e) {
        LOG.trace("Yield {}", procedure, e);
        procedure.afterExec(getEnvironment());
        yieldProcedure(procedure);
        return;
      } catch (InterruptedException e) {
        LOG.trace("Yield interrupt {}", procedure, e);
        handleInterruptedException(procedure, e);
        procedure.afterExec(getEnvironment());
        yieldProcedure(procedure);
        return;
      } catch (Throwable e) {
        // Catch NullPointerExceptions or similar errors...
        String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
        LOG.error(msg, e);
        procedure.setFailure(new RemoteProcedureException(msg, e));
      }

      if (!procedure.isFailed()) {
        if (subprocs != null) {
          if (subprocs.length == 1 && subprocs[0] == procedure) {
            // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
            // i.e. we go around this loop again rather than go back out on the scheduler queue.
            subprocs = null;
            reExecute = true;
            LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
          } else {
            // Yield the current procedure, and make the subprocedure runnable
            // subprocs may come back 'null'.
            subprocs = initializeChildren(procStack, procedure, subprocs);
            LOG.info("Initialized subprocedures=" + (subprocs == null
              ? null
              : Stream.of(subprocs).map(e -> "{" + e.toString() + "}").collect(Collectors.toList())
                .toString()));
          }
        } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
          LOG.trace("Added to timeoutExecutor {}", procedure);
          timeoutExecutor.add(procedure);
        } else if (!suspended) {
          // No subtask, so we are done
          procedure.setState(ProcedureState.SUCCESS);
        }
      }

      // allows to kill the executor before something is stored to the wal.
      // useful to test the procedure recovery.
      if (
        testing != null && testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())
      ) {
        kill("TESTING: Kill BEFORE store update: " + procedure);
      }

      // TODO: The code here doesn't check if store is running before persisting to the store as
      // it relies on the method call below to throw RuntimeException to wind up the stack and
      // executor thread to stop. The statement following the method call below seems to check if
      // store is not running, to prevent scheduling children procedures, re-execution or yield
      // of this procedure. This may need more scrutiny and subsequent cleanup in future
      //
      // Commit the transaction even if a suspend (state may have changed). Note this append
      // can take a bunch of time to complete.
      if (procedure.needPersistence()) {
        // Add the procedure to the stack
        // See HBASE-28210 on why we need synchronized here
        boolean needUpdateStoreOutsideLock = false;
        synchronized (procStack) {
          if (procStack.addRollbackStep(procedure)) {
            updateStoreOnExec(procStack, procedure, subprocs);
          } else {
            needUpdateStoreOutsideLock = true;
          }
        }
        // this is an optimization if we do not need to maintain rollback step, as all subprocedures
        // of the same root procedure share the same root procedure state, if we can only update
        // store under the above lock, the sub procedures of the same root procedure can only be
        // persistent sequentially, which will have a bad performance. See HBASE-28212 for more
        // details.
        if (needUpdateStoreOutsideLock) {
          updateStoreOnExec(procStack, procedure, subprocs);
        }
      }
      procedure.afterExec(getEnvironment());

      // if the store is not running we are aborting
      if (!store.isRunning()) {
        return;
      }
      // if the procedure is kind enough to pass the slot to someone else, yield
      if (
        procedure.isRunnable() && !suspended
          && procedure.isYieldAfterExecutionStep(getEnvironment())
      ) {
        yieldProcedure(procedure);
        return;
      }

      assert (reExecute && subprocs == null) || !reExecute;
    } while (reExecute);

    // Allows to kill the executor after something is stored to the WAL but before the below
    // state settings are done -- in particular the one on the end where we make parent
    // RUNNABLE again when its children are done; see countDownChildren.
    if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {
      kill("TESTING: Kill AFTER store update: " + procedure);
    }

    // Submit the new subprocedures
    if (subprocs != null && !procedure.isFailed()) {
      submitChildrenProcedures(subprocs);
    }

    // we need to log the release lock operation before waking up the parent procedure, as there
    // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
    // the sub procedures from store and cause problems...
    releaseLock(procedure, false);

    // if the procedure is complete and has a parent, count down the children latch.
    // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
    if (!suspended && procedure.isFinished() && procedure.hasParent()) {
      countDownChildren(procStack, procedure);
    }
  }