in maestro-engine/src/main/java/com/netflix/maestro/engine/listeners/MaestroFinalFlowStatusCallback.java [87:212]
public void onFlowFinalized(Flow flow) {
WorkflowSummary summary = StepHelper.retrieveWorkflowSummary(objectMapper, flow.getInput());
WorkflowRuntimeSummary runtimeSummary = retrieveWorkflowRuntimeSummary(flow);
String reason = flow.getReasonForIncompletion();
LOG.info(
"Workflow {} with execution_id [{}] is finalized with internal state [{}] and reason [{}]",
summary.getIdentity(),
flow.getFlowId(),
flow.getStatus(),
reason);
metrics.counter(
MetricConstants.FINAL_FLOW_STATUS_CALL_BACK_METRIC,
getClass(),
MetricConstants.TYPE_TAG,
"onFlowFinalized",
MetricConstants.STATUS_TAG,
flow.getStatus().name());
if (reason != null
&& flow.getStatus() == Flow.Status.FAILED
&& reason.startsWith(MaestroStartTask.DEDUP_FAILURE_PREFIX)) {
LOG.info(
"Workflow {} with execution_id [{}] has not actually started, thus skip onFlowFinalized.",
summary.getIdentity(),
flow.getFlowId());
return; // special case doing nothing
}
WorkflowInstance.Status instanceStatus =
instanceDao.getWorkflowInstanceStatus(
summary.getWorkflowId(), summary.getWorkflowInstanceId(), summary.getWorkflowRunId());
if (instanceStatus == null || (instanceStatus.isTerminal() && flow.getStatus().isTerminal())) {
LOG.info(
"Workflow {} with execution_id [{}] does not exist or already "
+ "in a terminal state [{}] with internal state [{}], thus skip onFlowFinalized.",
summary.getIdentity(),
flow.getFlowId(),
instanceStatus,
flow.getStatus());
return;
}
LOG.info(
"Workflow {} with execution_id [{}] is not in a terminal state [{}] "
+ "with internal state [{}], thus run onFlowFinalized.",
summary.getIdentity(),
flow.getFlowId(),
instanceStatus,
flow.getStatus());
Map<String, Task> realTaskMap =
TaskHelper.getUserDefinedRealTaskMap(flow.getStreamOfAllTasks());
// cancel internally failed tasks
realTaskMap.values().stream()
.filter(task -> !StepHelper.retrieveStepStatus(task.getOutputData()).isTerminal())
.forEach(task -> maestroTask.cancel(flow, task));
WorkflowRuntimeOverview overview =
TaskHelper.computeOverview(
objectMapper, summary, runtimeSummary.getRollupBase(), realTaskMap);
try {
validateAndUpdateOverview(overview, summary);
switch (flow.getStatus()) {
case TERMINATED: // stopped due to stop request
if (reason != null && reason.startsWith(FAILURE_REASON_PREFIX)) {
update(flow, WorkflowInstance.Status.FAILED, summary, overview);
} else {
update(flow, WorkflowInstance.Status.STOPPED, summary, overview);
}
break;
case TIMED_OUT:
update(flow, WorkflowInstance.Status.TIMED_OUT, summary, overview);
break;
default: // other status (FAILED, COMPLETED, RUNNING) to be handled here.
Optional<Task.Status> done =
TaskHelper.checkProgress(realTaskMap, summary, overview, true);
switch (done.orElse(Task.Status.IN_PROGRESS)) {
/**
* This is a special status to indicate that the workflow has succeeded. Check {@link
* TaskHelper#checkProgress} for more details.
*/
case FAILED_WITH_TERMINAL_ERROR:
WorkflowInstance.Status nextStatus =
AggregatedViewHelper.deriveAggregatedStatus(
instanceDao, summary, WorkflowInstance.Status.SUCCEEDED, overview);
if (!nextStatus.isTerminal()) {
throw new MaestroInternalError(
"Invalid status: [%s], expecting a terminal one", nextStatus);
}
update(flow, nextStatus, summary, overview);
break;
case FAILED:
case CANCELED: // due to step failure
update(flow, WorkflowInstance.Status.FAILED, summary, overview);
break;
case TIMED_OUT:
update(flow, WorkflowInstance.Status.TIMED_OUT, summary, overview);
break;
// all other status are invalid
default:
metrics.counter(
MetricConstants.FINAL_FLOW_STATUS_CALL_BACK_METRIC,
getClass(),
MetricConstants.TYPE_TAG,
"invalidStatusOnFlowFinalized");
throw new MaestroInternalError(
"Invalid status [%s] onFlowFinalized", flow.getStatus());
}
break;
}
} catch (MaestroInternalError | IllegalArgumentException e) {
// non-retryable error and still fail the instance
LOG.warn("onFlowFinalized is failed with a non-retryable error", e);
metrics.counter(
MetricConstants.FINAL_FLOW_STATUS_CALL_BACK_METRIC,
getClass(),
MetricConstants.TYPE_TAG,
"nonRetryableErrorOnFlowFinalized");
update(
flow,
WorkflowInstance.Status.FAILED,
summary,
overview,
Details.create(e.getMessage(), "onFlowFinalized is failed with non-retryable error."));
}
}