public static Map computeDag()

in maestro-engine/src/main/java/com/netflix/maestro/engine/transformation/WorkflowGraph.java [175:243]


  public static Map<String, StepTransition> computeDag(
      Workflow workflow, List<String> startStepIds, List<String> endStepIds) {
    Map<String, Step> stepMap =
        workflow.getSteps().stream()
            .collect(
                Collectors.toMap(
                    Step::getId,
                    Function.identity(),
                    (step1, step2) -> {
                      throw new IllegalArgumentException(
                          String.format(
                              "Invalid definition of workflow [%s], where two steps have the same id [%s]",
                              workflow.getId(), step1.getId()));
                    }));

    if (startStepIds != null) {
      Map<String, Step> visited = new HashMap<>();
      Queue<Step> queue = new ArrayDeque<>();
      for (String stepId : startStepIds) {
        Step step =
            Checks.notNull(
                stepMap.get(stepId),
                "Cannot start the graph from step id [%s] as workflow does not contain it.",
                stepId);
        step.getTransition().getPredecessors().clear();
        visited.put(step.getId(), step);
        queue.add(step);
      }
      if (!ObjectHelper.isCollectionEmptyOrNull(endStepIds)) {
        for (String stepId : endStepIds) {
          Step step =
              Checks.notNull(
                  stepMap.get(stepId),
                  "Cannot end the graph with step id [%s] as workflow does not contain it.",
                  stepId);
          step.getTransition().getSuccessors().clear();
          visited.put(step.getId(), step);
        }
      }

      while (!queue.isEmpty()) {
        Step step = queue.remove();
        for (String successor : step.getTransition().getSuccessors().keySet()) {
          if (!visited.containsKey(successor)) {
            Step toAdd = stepMap.get(successor);
            queue.add(toAdd);
            visited.put(toAdd.getId(), toAdd);
          }
        }
      }
      stepMap = visited;
    }

    Map<String, GraphNode> nodeMap = computeNodeMap(workflow.getId(), stepMap);

    for (GraphNode node : nodeMap.values()) {
      // add predecessors if empty
      if (stepMap.get(node.stepId).getTransition().getPredecessors().isEmpty()) {
        stepMap.get(node.stepId).getTransition().getPredecessors().addAll(node.parents.keySet());
      }
    }

    Checks.checkTrue(
        !containsCycleInDag(nodeMap),
        "Invalid workflow definition [%s], where DAG contains cycle",
        workflow.getId());

    return stepMap.values().stream().collect(MapHelper.toListMap(Step::getId, Step::getTransition));
  }