in core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java [182:314]
public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException {
ParamChecker.notEmpty(executionPath, "executionPath");
Objects.requireNonNull(signalValue, "signalValue cannot be null");
if (status != Status.RUNNING) {
throw new WorkflowException(ErrorCode.E0716);
}
NodeInstance nodeJob = executionPaths.get(executionPath);
log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}] for node [{2}]", executionPath,
signalValue, (nodeJob == null ? null : nodeJob.nodeName));
if (nodeJob == null) {
status = Status.FAILED;
log.error("invalid execution path [{0}]", executionPath);
}
NodeDef nodeDef = null;
if (!status.isEndState()) {
nodeDef = def.getNode(nodeJob.nodeName);
if (nodeDef == null) {
status = Status.FAILED;
log.error("invalid transition [{0}]", nodeJob.nodeName);
}
}
if (!status.isEndState()) {
NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
boolean exiting = true;
Context context = new Context(nodeDef, executionPath, signalValue);
if (!nodeJob.started) {
try {
nodeHandler.loopDetection(context);
exiting = nodeHandler.enter(context);
nodeJob.started = true;
}
catch (WorkflowException ex) {
status = Status.FAILED;
List<String> killedNodes = terminateNodes(Status.KILLED);
if (killedNodes.size() > 1) {
log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
.size());
}
throw ex;
}
}
if (exiting) {
List<String> pathsToStart = new ArrayList<String>();
List<String> fullTransitions;
try {
fullTransitions = nodeHandler.multiExit(context);
int last = fullTransitions.size() - 1;
// TEST THIS
if (last >= 0) {
String transitionTo = getTransitionNode(fullTransitions.get(last));
if (nodeDef instanceof ForkNodeDef) {
transitionTo = "*"; // WF action cannot hold all transitions for a fork.
// transitions are hardcoded in the WF app.
}
persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO,
transitionTo);
}
}
catch (WorkflowException ex) {
status = Status.FAILED;
throw ex;
}
if (context.status == Status.KILLED) {
status = Status.KILLED;
log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName);
} else if (context.status == Status.FAILED) {
status = Status.FAILED;
log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName);
} else if (context.status == Status.SUCCEEDED) {
status = Status.SUCCEEDED;
log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
} else {
for (String fullTransition : fullTransitions) {
//this is the whole trick for forking, we need the executionpath and the transition.
//in case of no forking, last element of executionpath is different from transition.
//in case of forking, they are the same
log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName,
fullTransition);
String execPathFromTransition = getExecutionPath(fullTransition);
String transition = getTransitionNode(fullTransition);
def.validateTransition(nodeJob.nodeName, transition);
NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition);
if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) {
// TODO explain this IF better
// If the WfJob is signaled with the parent
// execution executionPath again
// The Fork node will execute again.. and replace
// the Node WorkflowJobBean
// so this is required to prevent that..
// Question : Should we throw an error in this case
// ??
executionPaths.put(execPathFromTransition, new NodeInstance(transition));
pathsToStart.add(execPathFromTransition);
}
}
// signal all new synch transitions
for (String pathToStart : pathsToStart) {
signal(pathToStart, "::synch::");
}
}
}
}
if (status.isEndState()) {
if (status == Status.FAILED) {
List<String> failedNodes = terminateNodes(status);
log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes
.size());
}
else {
List<String> killedNodes = terminateNodes(Status.KILLED);
if (killedNodes.size() > 1) {
log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
.size());
}
}
}
return status.isEndState();
}