in hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java [1806:1953]
private void execProcedure(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure) {
Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
"NOT RUNNABLE! " + procedure.toString());
// Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
// The exception is caught below and then we hurry to the exit without disturbing state. The
// idea is that the processing of this procedure will be unsuspended later by an external event
// such the report of a region open.
boolean suspended = false;
// Whether to 're-' -execute; run through the loop again.
boolean reExecute = false;
Procedure<TEnvironment>[] subprocs = null;
do {
reExecute = false;
procedure.resetPersistence();
try {
procedure.beforeExec(getEnvironment());
subprocs = procedure.doExecute(getEnvironment());
if (subprocs != null && subprocs.length == 0) {
subprocs = null;
}
} catch (ProcedureSuspendedException e) {
LOG.trace("Suspend {}", procedure);
suspended = true;
} catch (ProcedureYieldException e) {
LOG.trace("Yield {}", procedure, e);
procedure.afterExec(getEnvironment());
yieldProcedure(procedure);
return;
} catch (InterruptedException e) {
LOG.trace("Yield interrupt {}", procedure, e);
handleInterruptedException(procedure, e);
procedure.afterExec(getEnvironment());
yieldProcedure(procedure);
return;
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
LOG.error(msg, e);
procedure.setFailure(new RemoteProcedureException(msg, e));
}
if (!procedure.isFailed()) {
if (subprocs != null) {
if (subprocs.length == 1 && subprocs[0] == procedure) {
// Procedure returned itself. Quick-shortcut for a state machine-like procedure;
// i.e. we go around this loop again rather than go back out on the scheduler queue.
subprocs = null;
reExecute = true;
LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
} else {
// Yield the current procedure, and make the subprocedure runnable
// subprocs may come back 'null'.
subprocs = initializeChildren(procStack, procedure, subprocs);
LOG.info("Initialized subprocedures=" + (subprocs == null
? null
: Stream.of(subprocs).map(e -> "{" + e.toString() + "}").collect(Collectors.toList())
.toString()));
}
} else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
LOG.trace("Added to timeoutExecutor {}", procedure);
timeoutExecutor.add(procedure);
} else if (!suspended) {
// No subtask, so we are done
procedure.setState(ProcedureState.SUCCESS);
}
}
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (
testing != null && testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())
) {
kill("TESTING: Kill BEFORE store update: " + procedure);
}
// TODO: The code here doesn't check if store is running before persisting to the store as
// it relies on the method call below to throw RuntimeException to wind up the stack and
// executor thread to stop. The statement following the method call below seems to check if
// store is not running, to prevent scheduling children procedures, re-execution or yield
// of this procedure. This may need more scrutiny and subsequent cleanup in future
//
// Commit the transaction even if a suspend (state may have changed). Note this append
// can take a bunch of time to complete.
if (procedure.needPersistence()) {
// Add the procedure to the stack
// See HBASE-28210 on why we need synchronized here
boolean needUpdateStoreOutsideLock = false;
synchronized (procStack) {
if (procStack.addRollbackStep(procedure)) {
updateStoreOnExec(procStack, procedure, subprocs);
} else {
needUpdateStoreOutsideLock = true;
}
}
// this is an optimization if we do not need to maintain rollback step, as all subprocedures
// of the same root procedure share the same root procedure state, if we can only update
// store under the above lock, the sub procedures of the same root procedure can only be
// persistent sequentially, which will have a bad performance. See HBASE-28212 for more
// details.
if (needUpdateStoreOutsideLock) {
updateStoreOnExec(procStack, procedure, subprocs);
}
}
procedure.afterExec(getEnvironment());
// if the store is not running we are aborting
if (!store.isRunning()) {
return;
}
// if the procedure is kind enough to pass the slot to someone else, yield
if (
procedure.isRunnable() && !suspended
&& procedure.isYieldAfterExecutionStep(getEnvironment())
) {
yieldProcedure(procedure);
return;
}
assert (reExecute && subprocs == null) || !reExecute;
} while (reExecute);
// Allows to kill the executor after something is stored to the WAL but before the below
// state settings are done -- in particular the one on the end where we make parent
// RUNNABLE again when its children are done; see countDownChildren.
if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {
kill("TESTING: Kill AFTER store update: " + procedure);
}
// Submit the new subprocedures
if (subprocs != null && !procedure.isFailed()) {
submitChildrenProcedures(subprocs);
}
// we need to log the release lock operation before waking up the parent procedure, as there
// could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
// the sub procedures from store and cause problems...
releaseLock(procedure, false);
// if the procedure is complete and has a parent, count down the children latch.
// If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
if (!suspended && procedure.isFinished() && procedure.hasParent()) {
countDownChildren(procStack, procedure);
}
}