in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/ForeachStepRuntime.java [717:781]
private Optional<Details> launchForeachIterations(
WorkflowSummary workflowSummary,
ForeachStep step,
StepRuntimeSummary runtimeSummary,
ForeachArtifact artifact,
AtomicInteger index,
final int launchLimit) {
Workflow inlineWorkflow =
createInlineWorkflow(artifact.getForeachWorkflowId(), artifact.getForeachIdentity(), step);
List<RunRequest> runRequests = new ArrayList<>();
List<Long> instanceIds = new ArrayList<>();
Set<Long> skippedIterations =
artifact
.getForeachOverview()
.getSkippedIterationsInRange(index.get() + 1, artifact.getAncestorIterationCount());
boolean isDone = (index.get() >= artifact.getTotalLoopCount());
for (int idx = index.get(), cnt = 0; !isDone; ++idx) {
long instanceId = idx + 1;
if (!skippedIterations.contains(instanceId)) {
RunRequest runRequest =
StepHelper.createInternalWorkflowRunRequest(
workflowSummary,
runtimeSummary,
Collections.singletonList(Tag.create(FOREACH_TAG_NAME)),
createForeachRunParams(idx, workflowSummary, step, runtimeSummary),
generateDedupKey(artifact, idx));
if (instanceStepConcurrencyHandler.addInstance(runRequest)) {
runRequests.add(runRequest);
instanceIds.add(instanceId);
cnt++;
isDone = (cnt == launchLimit);
} else {
idx = idx - 1; // exclude the current index as it is not launched
isDone = true; // end the iteration
}
}
isDone = isDone || (idx == artifact.getTotalLoopCount() - 1);
if (runRequests.size() == properties.getInsertBatchLimit() || isDone) {
// run (start or restart) foreach batch and pass run properties to inline foreach instances
Optional<Details> details =
actionHandler.runForeachBatch(
inlineWorkflow,
workflowSummary.getInternalId(), // inherit parent unique internalId
workflowSummary.getWorkflowVersionId(), // inherit parent versionId
workflowSummary.getRunProperties(), // inherit parent run properties
step.getId(),
artifact,
runRequests,
instanceIds,
properties.getRunJobBatchLimit());
if (details.isPresent()) {
return details;
} else {
index.set(idx + 1);
runRequests.clear();
instanceIds.clear();
}
}
}
return Optional.empty();
}