in core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java [287:438]
private Object runSubworkflowsWithConcurrency(WorkflowStepInstanceExecutionContext context, List<WorkflowExecutionContext> nestedWorkflowContexts, Map reducingV, boolean wasList, boolean isReplaying, ReplayContinuationInstructions instructionsIfReplaying) {
LOG.debug("Running sub-workflows "+nestedWorkflowContexts);
if (nestedWorkflowContexts.isEmpty()) return reducingV!=null ? reducingV : MutableList.of();
long ci = 1;
Object c = concurrency;
if (c != null && wasList) {
if (reducingV!=null) {
// if reducing, force concurrency 1, and also disallow dynamic concurrency (to prevent things that work in tests with dynamic value resolving to 1 but fail in real life when not 1)
Maybe<Integer> cm = TypeCoercions.tryCoerce(c, Integer.class);
if (cm.isAbsent() || cm.get()!=1)
throw new IllegalArgumentException("Concurrency cannot be used unless static value 1 when reducing");
ci = 1;
} else {
c = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, c, Object.class);
if (c instanceof Number) {
// okay
} else if (c instanceof String) {
c = WorkflowConcurrencyParser.parse((String) c).apply((double) nestedWorkflowContexts.size());
} else {
throw new IllegalArgumentException("Unsupported concurrency object: '" + c + "'");
}
ci = (long) Math.floor(0.000001 + ((Number) c).doubleValue());
if (ci <= 0)
throw new IllegalArgumentException("Invalid concurrency value: " + ci + " (concurrency " + c + ", target size " + nestedWorkflowContexts.size() + ")");
}
}
AtomicInteger availableThreads = ci == 1 ? null : new AtomicInteger((int) ci);
List<Task<?>> submitted = MutableList.of();
List<Pair<WorkflowExecutionContext,Task<?>>> delayedBecauseReducing = MutableList.of();
WorkflowExecutionContext lastWorkflowRunBeforeReplay = null;
List<Throwable> errors = MutableList.of();
for (int i = 0; i < nestedWorkflowContexts.size(); i++) {
if (availableThreads != null) while (availableThreads.get() <= 0) {
try {
Tasks.withBlockingDetails("Waiting before running remaining " + (nestedWorkflowContexts.size() - i) + " instances because " + ci + " are currently running",
() -> {
synchronized (availableThreads) {
availableThreads.wait(500);
}
return null;
});
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
Task<Object> task;
if (!isReplaying) {
task = nestedWorkflowContexts.get(i).getTask(false).get();
} else {
try {
Pair<Boolean, Object> check = WorkflowReplayUtils.checkReplayResumingInSubWorkflowAlsoReturningTaskOrResult("nested workflow " + (i + 1), context, nestedWorkflowContexts.get(i), instructionsIfReplaying,
(w, e) -> {
throw new IllegalStateException("Sub workflow " + w + " is not replayable", e);
}, false);
if (check.getLeft()) {
task = (Task<Object>) check.getRight();
} else {
// completed, skip run; workflow output will be set and used by caller (so can ignore check.getRight() here)
task = null;
// unless we are reducing, in which case we need to track which one ran last
lastWorkflowRunBeforeReplay = nestedWorkflowContexts.get(i);
}
} catch (Exception e) {
errors.add(e);
task = null;
}
}
if (task != null) {
if (Entities.isUnmanagingOrNoLongerManaged(context.getEntity())) {
// on shutdown don't keep running tasks
task.cancel(false);
} else {
if (reducingV!=null) {
delayedBecauseReducing.add(Pair.of(nestedWorkflowContexts.get(i), task));
} else {
if (availableThreads != null) {
availableThreads.decrementAndGet();
((EntityInternal) context.getEntity()).getExecutionContext().submit(MutableMap.of("newTaskEndCallback", (Runnable) () -> {
availableThreads.incrementAndGet();
synchronized (availableThreads) {
availableThreads.notifyAll();
}
}),
task);
}
DynamicTasks.queue(task);
submitted.add(task);
}
}
}
}
if (reducingV==null) {
assert delayedBecauseReducing.isEmpty();
submitted.forEach(t -> {
try {
if (!t.isSubmitted() && !errors.isEmpty()) {
// if concurrent, all tasks will be submitted, and we should wait;
// if not, then there might be queued tasks not yet submitted; if there are errors, they will never be submitted
return;
}
t.get();
} catch (Throwable tt) {
errors.add(tt);
}
});
if (!errors.isEmpty()) {
throw Exceptions.propagate("Error"+(errors.size()>1 ? "s" : "")+" running sub-workflow"+(nestedWorkflowContexts.size()>1 ? "s" : "")+" in "+context.getWorkflowStepReference(), errors);
}
} else {
// if reducing we need to wrap each execution to get/set last values
assert submitted.isEmpty();
if (lastWorkflowRunBeforeReplay!=null) {
// if interrupted we need to explicitly take from the last step
reducingV = updateReducingWorkflowVarsFromLastStep(lastWorkflowRunBeforeReplay, reducingV);
}
for (Pair<WorkflowExecutionContext, Task<?>> p : delayedBecauseReducing) {
WorkflowExecutionContext wc = p.getLeft();
if (wc.getCurrentStepIndex()==null || wc.getCurrentStepIndex()==WorkflowExecutionContext.STEP_INDEX_FOR_START) {
// initialize to last if it hasn't started
wc.updateWorkflowScratchVariables(reducingV);
}
DynamicTasks.queue(p.getRight()).getUnchecked();
reducingV = updateReducingWorkflowVarsFromLastStep(wc, reducingV);
}
}
Object returnValue;
if (reducingV==null) {
List result = MutableList.of();
nestedWorkflowContexts.forEach(nw -> result.add(nw.getOutput()));
if (!wasList && result.size() != 1) {
throw new IllegalStateException("Result mismatch, non-list target " + target + " yielded output " + result);
}
context.setOutput(result);
returnValue = !wasList ? Iterables.getOnlyElement(result) : result;
} else {
context.setOutput(reducingV);
context.getWorkflowExectionContext().updateWorkflowScratchVariables(reducingV);
returnValue = reducingV;
}
return returnValue;
}