in maestro-engine/src/main/java/com/netflix/maestro/engine/utils/TaskHelper.java [146:221]
public static WorkflowRuntimeOverview computeOverview(
ObjectMapper objectMapper,
WorkflowSummary summary,
WorkflowRollupOverview rollupBase,
Map<String, Task> realTaskMap) {
Map<String, StepRuntimeState> states =
realTaskMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
StepHelper.retrieveStepRuntimeState(
e.getValue().getOutputData(), objectMapper)));
EnumMap<StepInstance.Status, WorkflowStepStatusSummary> stepStatusMap =
toStepStatusMap(summary, states);
WorkflowRollupOverview rollupOverview =
realTaskMap.values().stream()
.filter(t -> t.getOutputData().containsKey(Constants.STEP_RUNTIME_SUMMARY_FIELD))
.map(
t -> {
StepRuntimeSummary stepSummary =
StepHelper.retrieveRuntimeSummary(objectMapper, t.getOutputData());
switch (stepSummary.getType()) {
case FOREACH:
if (stepSummary.getArtifacts().containsKey(Artifact.Type.FOREACH.key())) {
ForeachArtifact artifact =
stepSummary.getArtifacts().get(Artifact.Type.FOREACH.key()).asForeach();
if (artifact.getForeachOverview() != null
&& artifact.getForeachOverview().getCheckpoint() > 0) {
return artifact.getForeachOverview().getOverallRollup();
}
}
break;
case SUBWORKFLOW:
if (stepSummary.getArtifacts().containsKey(Artifact.Type.SUBWORKFLOW.key())) {
SubworkflowArtifact artifact =
stepSummary
.getArtifacts()
.get(Artifact.Type.SUBWORKFLOW.key())
.asSubworkflow();
if (artifact.getSubworkflowOverview() != null) {
return artifact.getSubworkflowOverview().getRollupOverview();
}
}
break;
case TEMPLATE:
default:
break;
}
StepInstance.Status status = StepHelper.retrieveStepStatus(t.getOutputData());
WorkflowRollupOverview.CountReference ref =
new WorkflowRollupOverview.CountReference();
ref.setCnt(1);
if (status.isOverview()) {
ref.setRef(
Collections.singletonMap(
RollupAggregationHelper.getReference(
summary.getWorkflowId(), summary.getWorkflowRunId()),
Collections.singletonList(
RollupAggregationHelper.getReference(
summary.getWorkflowInstanceId(),
stepSummary.getStepId(),
stepSummary.getStepAttemptId()))));
}
return WorkflowRollupOverview.of(1L, Collections.singletonMap(status, ref));
})
.reduce(new WorkflowRollupOverview(), WorkflowRollupOverview::aggregate);
rollupOverview.aggregate(rollupBase);
return WorkflowRuntimeOverview.of(summary.getTotalStepCount(), stepStatusMap, rollupOverview);
}