in maestro-engine/src/main/java/com/netflix/maestro/engine/processors/StepInstanceWakeUpEventProcessor.java [74:152]
private void processForStepEntity(StepInstanceWakeUpEvent jobEvent) {
if (jobEvent.getStepAction() == Actions.StepInstanceAction.RESTART) {
wakeupUnderlyingTask(jobEvent);
return;
}
// handle the simple leaf step case.
if (jobEvent.getStepType() != null && jobEvent.getStepType().isLeaf()) {
if (jobEvent.getStepStatus() != null && jobEvent.getStepStatus().shouldWakeup()) {
wakeupUnderlyingTask(jobEvent);
}
return;
}
// this is a non-leaf case.
StepInstance stepInstance =
stepInstanceDao.getStepInstance(
jobEvent.getWorkflowId(),
jobEvent.getWorkflowInstanceId(),
jobEvent.getWorkflowRunId(),
jobEvent.getStepId(),
jobEvent.getStepAttemptId());
if (stepInstance == null) {
LOG.warn(
"Action is requested on an invalid step instance. The requested action is: {}",
jobEvent.getMessageKey());
return;
}
if (stepInstance.getDefinition().getType().isLeaf()) {
if (stepInstance.getRuntimeState().getStatus().shouldWakeup()) {
wakeupUnderlyingTask(jobEvent);
}
return;
}
StepInstance.Status desiredStatus =
Checks.notNull(
jobEvent.getStepAction(),
"the step action cannot be null for a step action job event {}",
jobEvent.getMessageKey())
.getStatus();
if (stepInstance.getRuntimeState().getStatus() == desiredStatus) {
return;
}
boolean stepTerminalCheck = false;
if (stepInstance.getArtifacts() != null) {
switch (stepInstance.getDefinition().getType()) {
case FOREACH:
if (stepInstance.getArtifacts().containsKey(Artifact.Type.FOREACH.key())) {
ForeachArtifact foreachArtifact =
stepInstance.getArtifacts().get(Artifact.Type.FOREACH.key()).asForeach();
handleLeafTasksWakeup(
jobEvent.getGroupInfo(), foreachArtifact.getForeachOverview().getOverallRollup());
stepTerminalCheck = desiredStatus.isTerminal();
}
break;
case SUBWORKFLOW:
if (stepInstance.getArtifacts().containsKey(Artifact.Type.SUBWORKFLOW.key())) {
SubworkflowArtifact subworkflowArtifact =
stepInstance.getArtifacts().get(Artifact.Type.SUBWORKFLOW.key()).asSubworkflow();
handleLeafTasksWakeup(
jobEvent.getGroupInfo(),
subworkflowArtifact.getSubworkflowOverview().getRollupOverview());
stepTerminalCheck = desiredStatus.isTerminal();
}
break;
default:
LOG.warn(
"Invalid step type to be processed for this action. Action is {} and the step type is: {}",
jobEvent.getMessageKey(),
stepInstance.getDefinition().getType());
return;
}
// need to check the desired status again in the following retry.
if (stepTerminalCheck) {
throw new MaestroRetryableError(
"Current status is not the desired status after action is taking. Will check again");
}
}
}