private Optional getAction()

in maestro-engine/src/main/java/com/netflix/maestro/engine/dao/MaestroStepInstanceActionDao.java [480:563]


  private Optional<StepAction> getAction(WorkflowSummary summary, String stepId) {
    Initiator initiator = summary.getInitiator();
    List<UpstreamInitiator.Info> path = new ArrayList<>();
    StringBuilder sqlBuilder = new StringBuilder(GET_ACTION_QUERY);
    if (initiator instanceof UpstreamInitiator) {
      path.addAll(((UpstreamInitiator) initiator).getAncestors());
      sqlBuilder
          .append(" OR (")
          .append(String.join(") OR (", Collections.nCopies(path.size(), CONDITION_POSTFIX)))
          .append(')');
    }
    String sql = sqlBuilder.toString();

    UpstreamInitiator.Info self = new UpstreamInitiator.Info();
    self.setWorkflowId(summary.getWorkflowId());
    self.setInstanceId(summary.getWorkflowInstanceId());
    self.setRunId(summary.getWorkflowRunId());
    self.setStepId(stepId);
    path.add(self);

    List<StepAction> allActions =
        withMetricLogError(
            () ->
                withRetryableQuery(
                    sql,
                    stmt -> {
                      int idx = 0;
                      for (UpstreamInitiator.Info node : path) {
                        stmt.setString(++idx, node.getWorkflowId());
                        stmt.setLong(++idx, node.getInstanceId());
                        stmt.setLong(++idx, node.getRunId());
                        stmt.setString(++idx, node.getStepId());
                      }
                    },
                    result -> {
                      List<StepAction> actions = new ArrayList<>();
                      while (result.next()) {
                        StepAction res =
                            fromJson(result.getString(PAYLOAD_COLUMN), StepAction.class);
                        res.setCreateTime(result.getTimestamp("create_ts").getTime());
                        actions.add(res);
                      }
                      return actions;
                    }),
            "getActions",
            "Failed to get the actions for step {}[{}] with parents",
            summary.getIdentity(),
            stepId);

    AtomicInteger ordinal = new AtomicInteger(0);
    Map<UpstreamInitiator.Info, Integer> orders =
        path.stream()
            .collect(Collectors.toMap(Function.identity(), p -> ordinal.incrementAndGet()));

    allActions.sort(
        (a1, a2) ->
            Integer.compare(
                orders.getOrDefault(a2.toInfo(), 0), orders.getOrDefault(a1.toInfo(), 0)));

    // pick the final action from all of them.
    StepAction stepAction = null;
    for (StepAction action : allActions) {
      if (action.getAction() != null && action.getAction().isUsingUpstream()) {
        stepAction = action;
        break;
      } else if (System.currentTimeMillis() - action.getCreateTime() < actionTimeout
          && action.getWorkflowId().equals(self.getWorkflowId())
          && action.getWorkflowInstanceId() == self.getInstanceId()
          && action.getWorkflowRunId() == self.getRunId()
          && action.getStepId().equals(self.getStepId())) {
        stepAction = action;
      }
    }
    if (stepAction != null) {
      LOG.info(
          "Pick a pending action [{}] for step {}[{}] among total [{}] pending actions: [{}]",
          stepAction,
          summary.getIdentity(),
          stepId,
          allActions.size(),
          allActions);
    }
    return Optional.ofNullable(stepAction);
  }