public void onFlowFinalized()

in maestro-engine/src/main/java/com/netflix/maestro/engine/listeners/MaestroFinalFlowStatusCallback.java [87:212]


  public void onFlowFinalized(Flow flow) {
    WorkflowSummary summary = StepHelper.retrieveWorkflowSummary(objectMapper, flow.getInput());
    WorkflowRuntimeSummary runtimeSummary = retrieveWorkflowRuntimeSummary(flow);
    String reason = flow.getReasonForIncompletion();
    LOG.info(
        "Workflow {} with execution_id [{}] is finalized with internal state [{}] and reason [{}]",
        summary.getIdentity(),
        flow.getFlowId(),
        flow.getStatus(),
        reason);
    metrics.counter(
        MetricConstants.FINAL_FLOW_STATUS_CALL_BACK_METRIC,
        getClass(),
        MetricConstants.TYPE_TAG,
        "onFlowFinalized",
        MetricConstants.STATUS_TAG,
        flow.getStatus().name());

    if (reason != null
        && flow.getStatus() == Flow.Status.FAILED
        && reason.startsWith(MaestroStartTask.DEDUP_FAILURE_PREFIX)) {
      LOG.info(
          "Workflow {} with execution_id [{}] has not actually started, thus skip onFlowFinalized.",
          summary.getIdentity(),
          flow.getFlowId());
      return; // special case doing nothing
    }

    WorkflowInstance.Status instanceStatus =
        instanceDao.getWorkflowInstanceStatus(
            summary.getWorkflowId(), summary.getWorkflowInstanceId(), summary.getWorkflowRunId());
    if (instanceStatus == null || (instanceStatus.isTerminal() && flow.getStatus().isTerminal())) {
      LOG.info(
          "Workflow {} with execution_id [{}] does not exist or already "
              + "in a terminal state [{}] with internal state [{}], thus skip onFlowFinalized.",
          summary.getIdentity(),
          flow.getFlowId(),
          instanceStatus,
          flow.getStatus());
      return;
    }

    LOG.info(
        "Workflow {} with execution_id [{}] is not in a terminal state [{}] "
            + "with internal state [{}], thus run onFlowFinalized.",
        summary.getIdentity(),
        flow.getFlowId(),
        instanceStatus,
        flow.getStatus());
    Map<String, Task> realTaskMap =
        TaskHelper.getUserDefinedRealTaskMap(flow.getStreamOfAllTasks());
    // cancel internally failed tasks
    realTaskMap.values().stream()
        .filter(task -> !StepHelper.retrieveStepStatus(task.getOutputData()).isTerminal())
        .forEach(task -> maestroTask.cancel(flow, task));

    WorkflowRuntimeOverview overview =
        TaskHelper.computeOverview(
            objectMapper, summary, runtimeSummary.getRollupBase(), realTaskMap);

    try {
      validateAndUpdateOverview(overview, summary);
      switch (flow.getStatus()) {
        case TERMINATED: // stopped due to stop request
          if (reason != null && reason.startsWith(FAILURE_REASON_PREFIX)) {
            update(flow, WorkflowInstance.Status.FAILED, summary, overview);
          } else {
            update(flow, WorkflowInstance.Status.STOPPED, summary, overview);
          }
          break;
        case TIMED_OUT:
          update(flow, WorkflowInstance.Status.TIMED_OUT, summary, overview);
          break;
        default: // other status (FAILED, COMPLETED, RUNNING) to be handled here.
          Optional<Task.Status> done =
              TaskHelper.checkProgress(realTaskMap, summary, overview, true);
          switch (done.orElse(Task.Status.IN_PROGRESS)) {
              /**
               * This is a special status to indicate that the workflow has succeeded. Check {@link
               * TaskHelper#checkProgress} for more details.
               */
            case FAILED_WITH_TERMINAL_ERROR:
              WorkflowInstance.Status nextStatus =
                  AggregatedViewHelper.deriveAggregatedStatus(
                      instanceDao, summary, WorkflowInstance.Status.SUCCEEDED, overview);
              if (!nextStatus.isTerminal()) {
                throw new MaestroInternalError(
                    "Invalid status: [%s], expecting a terminal one", nextStatus);
              }
              update(flow, nextStatus, summary, overview);
              break;
            case FAILED:
            case CANCELED: // due to step failure
              update(flow, WorkflowInstance.Status.FAILED, summary, overview);
              break;
            case TIMED_OUT:
              update(flow, WorkflowInstance.Status.TIMED_OUT, summary, overview);
              break;
              // all other status are invalid
            default:
              metrics.counter(
                  MetricConstants.FINAL_FLOW_STATUS_CALL_BACK_METRIC,
                  getClass(),
                  MetricConstants.TYPE_TAG,
                  "invalidStatusOnFlowFinalized");
              throw new MaestroInternalError(
                  "Invalid status [%s] onFlowFinalized", flow.getStatus());
          }
          break;
      }
    } catch (MaestroInternalError | IllegalArgumentException e) {
      // non-retryable error and still fail the instance
      LOG.warn("onFlowFinalized is failed with a non-retryable error", e);
      metrics.counter(
          MetricConstants.FINAL_FLOW_STATUS_CALL_BACK_METRIC,
          getClass(),
          MetricConstants.TYPE_TAG,
          "nonRetryableErrorOnFlowFinalized");
      update(
          flow,
          WorkflowInstance.Status.FAILED,
          summary,
          overview,
          Details.create(e.getMessage(), "onFlowFinalized is failed with non-retryable error."));
    }
  }