private Optional launchForeachIterations()

in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/ForeachStepRuntime.java [717:781]


  private Optional<Details> launchForeachIterations(
      WorkflowSummary workflowSummary,
      ForeachStep step,
      StepRuntimeSummary runtimeSummary,
      ForeachArtifact artifact,
      AtomicInteger index,
      final int launchLimit) {
    Workflow inlineWorkflow =
        createInlineWorkflow(artifact.getForeachWorkflowId(), artifact.getForeachIdentity(), step);
    List<RunRequest> runRequests = new ArrayList<>();
    List<Long> instanceIds = new ArrayList<>();

    Set<Long> skippedIterations =
        artifact
            .getForeachOverview()
            .getSkippedIterationsInRange(index.get() + 1, artifact.getAncestorIterationCount());

    boolean isDone = (index.get() >= artifact.getTotalLoopCount());
    for (int idx = index.get(), cnt = 0; !isDone; ++idx) {
      long instanceId = idx + 1;
      if (!skippedIterations.contains(instanceId)) {
        RunRequest runRequest =
            StepHelper.createInternalWorkflowRunRequest(
                workflowSummary,
                runtimeSummary,
                Collections.singletonList(Tag.create(FOREACH_TAG_NAME)),
                createForeachRunParams(idx, workflowSummary, step, runtimeSummary),
                generateDedupKey(artifact, idx));

        if (instanceStepConcurrencyHandler.addInstance(runRequest)) {
          runRequests.add(runRequest);
          instanceIds.add(instanceId);
          cnt++;
          isDone = (cnt == launchLimit);
        } else {
          idx = idx - 1; // exclude the current index as it is not launched
          isDone = true; // end the iteration
        }
      }
      isDone = isDone || (idx == artifact.getTotalLoopCount() - 1);

      if (runRequests.size() == properties.getInsertBatchLimit() || isDone) {
        // run (start or restart) foreach batch and pass run properties to inline foreach instances
        Optional<Details> details =
            actionHandler.runForeachBatch(
                inlineWorkflow,
                workflowSummary.getInternalId(), // inherit parent unique internalId
                workflowSummary.getWorkflowVersionId(), // inherit parent versionId
                workflowSummary.getRunProperties(), // inherit parent run properties
                step.getId(),
                artifact,
                runRequests,
                instanceIds,
                properties.getRunJobBatchLimit());
        if (details.isPresent()) {
          return details;
        } else {
          index.set(idx + 1);
          runRequests.clear();
          instanceIds.clear();
        }
      }
    }
    return Optional.empty();
  }