in maestro-engine/src/main/java/com/netflix/maestro/engine/dao/MaestroStepInstanceActionDao.java [480:563]
private Optional<StepAction> getAction(WorkflowSummary summary, String stepId) {
Initiator initiator = summary.getInitiator();
List<UpstreamInitiator.Info> path = new ArrayList<>();
StringBuilder sqlBuilder = new StringBuilder(GET_ACTION_QUERY);
if (initiator instanceof UpstreamInitiator) {
path.addAll(((UpstreamInitiator) initiator).getAncestors());
sqlBuilder
.append(" OR (")
.append(String.join(") OR (", Collections.nCopies(path.size(), CONDITION_POSTFIX)))
.append(')');
}
String sql = sqlBuilder.toString();
UpstreamInitiator.Info self = new UpstreamInitiator.Info();
self.setWorkflowId(summary.getWorkflowId());
self.setInstanceId(summary.getWorkflowInstanceId());
self.setRunId(summary.getWorkflowRunId());
self.setStepId(stepId);
path.add(self);
List<StepAction> allActions =
withMetricLogError(
() ->
withRetryableQuery(
sql,
stmt -> {
int idx = 0;
for (UpstreamInitiator.Info node : path) {
stmt.setString(++idx, node.getWorkflowId());
stmt.setLong(++idx, node.getInstanceId());
stmt.setLong(++idx, node.getRunId());
stmt.setString(++idx, node.getStepId());
}
},
result -> {
List<StepAction> actions = new ArrayList<>();
while (result.next()) {
StepAction res =
fromJson(result.getString(PAYLOAD_COLUMN), StepAction.class);
res.setCreateTime(result.getTimestamp("create_ts").getTime());
actions.add(res);
}
return actions;
}),
"getActions",
"Failed to get the actions for step {}[{}] with parents",
summary.getIdentity(),
stepId);
AtomicInteger ordinal = new AtomicInteger(0);
Map<UpstreamInitiator.Info, Integer> orders =
path.stream()
.collect(Collectors.toMap(Function.identity(), p -> ordinal.incrementAndGet()));
allActions.sort(
(a1, a2) ->
Integer.compare(
orders.getOrDefault(a2.toInfo(), 0), orders.getOrDefault(a1.toInfo(), 0)));
// pick the final action from all of them.
StepAction stepAction = null;
for (StepAction action : allActions) {
if (action.getAction() != null && action.getAction().isUsingUpstream()) {
stepAction = action;
break;
} else if (System.currentTimeMillis() - action.getCreateTime() < actionTimeout
&& action.getWorkflowId().equals(self.getWorkflowId())
&& action.getWorkflowInstanceId() == self.getInstanceId()
&& action.getWorkflowRunId() == self.getRunId()
&& action.getStepId().equals(self.getStepId())) {
stepAction = action;
}
}
if (stepAction != null) {
LOG.info(
"Pick a pending action [{}] for step {}[{}] among total [{}] pending actions: [{}]",
stepAction,
summary.getIdentity(),
stepId,
allActions.size(),
allActions);
}
return Optional.ofNullable(stepAction);
}