in maestro-engine/src/main/java/com/netflix/maestro/engine/tasks/MaestroTask.java [761:904]
private boolean doExecute(
Flow flow,
Task task,
WorkflowSummary workflowSummary,
Step stepDefinition,
StepRuntimeSummary runtimeSummary) {
boolean doneWithExecute = false;
while (!doneWithExecute) {
try {
switch (runtimeSummary.getRuntimeState().getStatus()) {
case NOT_CREATED:
// this is for internal placeholder task to keep the state non-terminal
return true;
case CREATED:
doneWithExecute =
initialize(flow, task, stepDefinition, workflowSummary, runtimeSummary);
break;
case INITIALIZED:
if (stepBreakpointDao.createPausedStepAttemptIfNeeded(
workflowSummary.getWorkflowId(),
workflowSummary.getWorkflowVersionId(),
workflowSummary.getWorkflowInstanceId(),
workflowSummary.getWorkflowRunId(),
runtimeSummary.getStepId(),
runtimeSummary.getStepAttemptId())) {
runtimeSummary.markPaused(tracingManager);
} else {
runtimeSummary.markWaitSignal(tracingManager);
}
break;
case PAUSED:
if (stepBreakpointDao.shouldStepResume(
workflowSummary.getWorkflowId(),
workflowSummary.getWorkflowVersionId(),
workflowSummary.getWorkflowInstanceId(),
workflowSummary.getWorkflowRunId(),
runtimeSummary.getStepId(),
runtimeSummary.getStepAttemptId())) {
runtimeSummary.markWaitSignal(tracingManager);
} else {
runtimeSummary.addTimeline(
TimelineLogEvent.info(
"Step is paused for workflowId: [%s], instanceId: [%d],"
+ "runId: [%d], stepId: [%s], stepAttempt: [%d]",
workflowSummary.getWorkflowId(),
workflowSummary.getWorkflowInstanceId(),
workflowSummary.getWorkflowRunId(),
runtimeSummary.getStepId(),
runtimeSummary.getStepAttemptId()));
doneWithExecute = true;
}
break;
case WAITING_FOR_SIGNALS:
if (signalsReady(workflowSummary, runtimeSummary)) {
runtimeSummary.markEvaluateParam(tracingManager);
} else {
doneWithExecute = true;
}
break;
case EVALUATING_PARAMS:
doneWithExecute =
evaluateParams(flow, task, stepDefinition, workflowSummary, runtimeSummary);
break;
case WAITING_FOR_PERMITS:
if (permitsReady(workflowSummary, runtimeSummary)) {
// If all required tag permits are acquired, then transition to starting.
runtimeSummary.markStarting(tracingManager);
task.setStartTime(runtimeSummary.getRuntimeState().getStartTime());
} else {
doneWithExecute = true;
}
break;
case STARTING:
doneWithExecute =
stepRuntimeManager.start(workflowSummary, stepDefinition, runtimeSummary);
break;
case RUNNING:
doneWithExecute =
stepRuntimeManager.execute(workflowSummary, stepDefinition, runtimeSummary);
break;
case FINISHING:
outputDataManager.validateAndMergeOutputParamsAndArtifacts(runtimeSummary);
if (initializeAndSendOutputSignals(
flow, stepDefinition, workflowSummary, runtimeSummary)) {
runtimeSummary.markTerminated(StepInstance.Status.SUCCEEDED, tracingManager);
}
break;
case UNSATISFIED:
case DISABLED:
case SKIPPED:
case SUCCEEDED:
case COMPLETED_WITH_ERROR:
evaluateNextConditionParams(flow, stepDefinition, runtimeSummary);
doneWithExecute = true;
break;
case FATALLY_FAILED: // Failure mode only applies to FATALLY_FAILED
if (!runtimeSummary.isIgnoreFailureMode()) {
if (FailureMode.IGNORE_FAILURE == stepDefinition.getFailureMode()) {
runtimeSummary.markTerminated(
StepInstance.Status.COMPLETED_WITH_ERROR, tracingManager);
runtimeSummary.addTimeline(
TimelineLogEvent.info(
"Step is failed but marked as COMPLETED_WITH_ERROR "
+ "because its failure mode is IGNORE_FAILURE."));
break;
} else if (FailureMode.FAIL_IMMEDIATELY == stepDefinition.getFailureMode()) {
// todo this should be better handled by the status listener
terminateAllSteps(flow, workflowSummary, stepDefinition.getId());
}
}
// fall through, otherwise
case INTERNALLY_FAILED: // Ignoring failure model as the error happens within Maestro
case USER_FAILED:
case PLATFORM_FAILED:
case TIMEOUT_FAILED:
case STOPPED:
case TIMED_OUT:
doneWithExecute = true;
break;
default:
throw new MaestroInternalError(
"Execution is at an unexpected state [%s] for step %s",
runtimeSummary.getRuntimeState().getStatus(), runtimeSummary.getIdentity());
}
} catch (MaestroRetryableError error) {
LOG.warn(
"Got a MaestroRetryableError for the task [{}] in flow [{}], ",
task.getTaskId(),
flow.getFlowId(),
error);
throw error;
} catch (Exception e) {
LOG.warn(
"Fatally failed to execute Maestro step for the task [{}] in flow [{}], get an exception:",
task.getTaskId(),
flow.getFlowId(),
e);
runtimeSummary.markInternalError(e, tracingManager);
doneWithExecute = false; // the next while loop will handle it.
}
}
return false;
}