private Object runSubworkflowsWithConcurrency()

in core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java [287:438]


    private Object runSubworkflowsWithConcurrency(WorkflowStepInstanceExecutionContext context, List<WorkflowExecutionContext> nestedWorkflowContexts, Map reducingV, boolean wasList, boolean isReplaying, ReplayContinuationInstructions instructionsIfReplaying) {
        LOG.debug("Running sub-workflows "+nestedWorkflowContexts);
        if (nestedWorkflowContexts.isEmpty()) return reducingV!=null ? reducingV : MutableList.of();

        long ci = 1;
        Object c = concurrency;
        if (c != null && wasList) {
            if (reducingV!=null) {
                // if reducing, force concurrency 1, and also disallow dynamic concurrency (to prevent things that work in tests with dynamic value resolving to 1 but fail in real life when not 1)
                Maybe<Integer> cm = TypeCoercions.tryCoerce(c, Integer.class);
                if (cm.isAbsent() || cm.get()!=1)
                    throw new IllegalArgumentException("Concurrency cannot be used unless static value 1 when reducing");
                ci = 1;
            } else {
                c = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, c, Object.class);
                if (c instanceof Number) {
                    // okay
                } else if (c instanceof String) {
                    c = WorkflowConcurrencyParser.parse((String) c).apply((double) nestedWorkflowContexts.size());
                } else {
                    throw new IllegalArgumentException("Unsupported concurrency object: '" + c + "'");
                }
                ci = (long) Math.floor(0.000001 + ((Number) c).doubleValue());
                if (ci <= 0)
                    throw new IllegalArgumentException("Invalid concurrency value: " + ci + " (concurrency " + c + ", target size " + nestedWorkflowContexts.size() + ")");
            }
        }

        AtomicInteger availableThreads = ci == 1 ? null : new AtomicInteger((int) ci);
        List<Task<?>> submitted = MutableList.of();
        List<Pair<WorkflowExecutionContext,Task<?>>> delayedBecauseReducing = MutableList.of();
        WorkflowExecutionContext lastWorkflowRunBeforeReplay = null;
        List<Throwable> errors = MutableList.of();
        for (int i = 0; i < nestedWorkflowContexts.size(); i++) {
            if (availableThreads != null) while (availableThreads.get() <= 0) {
                try {
                    Tasks.withBlockingDetails("Waiting before running remaining " + (nestedWorkflowContexts.size() - i) + " instances because " + ci + " are currently running",
                            () -> {
                                synchronized (availableThreads) {
                                    availableThreads.wait(500);
                                }
                                return null;
                            });
                } catch (Exception e) {
                    throw Exceptions.propagate(e);
                }
            }

            Task<Object> task;
            if (!isReplaying) {
                task = nestedWorkflowContexts.get(i).getTask(false).get();
            } else {
                try {
                    Pair<Boolean, Object> check = WorkflowReplayUtils.checkReplayResumingInSubWorkflowAlsoReturningTaskOrResult("nested workflow " + (i + 1), context, nestedWorkflowContexts.get(i), instructionsIfReplaying,
                            (w, e) -> {
                                throw new IllegalStateException("Sub workflow " + w + " is not replayable", e);
                            }, false);
                    if (check.getLeft()) {
                        task = (Task<Object>) check.getRight();
                    } else {
                        // completed, skip run; workflow output will be set and used by caller (so can ignore check.getRight() here)
                        task = null;
                        // unless we are reducing, in which case we need to track which one ran last
                        lastWorkflowRunBeforeReplay = nestedWorkflowContexts.get(i);
                    }
                } catch (Exception e) {
                    errors.add(e);
                    task = null;
                }
            }
            if (task != null) {
                if (Entities.isUnmanagingOrNoLongerManaged(context.getEntity())) {
                    // on shutdown don't keep running tasks
                    task.cancel(false);
                } else {
                    if (reducingV!=null) {
                        delayedBecauseReducing.add(Pair.of(nestedWorkflowContexts.get(i), task));
                    } else {
                        if (availableThreads != null) {
                            availableThreads.decrementAndGet();
                            ((EntityInternal) context.getEntity()).getExecutionContext().submit(MutableMap.of("newTaskEndCallback", (Runnable) () -> {
                                        availableThreads.incrementAndGet();
                                        synchronized (availableThreads) {
                                            availableThreads.notifyAll();
                                        }
                                    }),
                                    task);
                        }
                        DynamicTasks.queue(task);
                        submitted.add(task);
                    }
                }
            }
        }

        if (reducingV==null) {
            assert delayedBecauseReducing.isEmpty();
            submitted.forEach(t -> {
                try {
                    if (!t.isSubmitted() && !errors.isEmpty()) {
                        // if concurrent, all tasks will be submitted, and we should wait;
                        // if not, then there might be queued tasks not yet submitted; if there are errors, they will never be submitted
                        return;
                    }
                    t.get();
                } catch (Throwable tt) {
                    errors.add(tt);
                }
            });

            if (!errors.isEmpty()) {
                throw Exceptions.propagate("Error"+(errors.size()>1 ? "s" : "")+" running sub-workflow"+(nestedWorkflowContexts.size()>1 ? "s" : "")+" in "+context.getWorkflowStepReference(), errors);
            }

        } else {
            // if reducing we need to wrap each execution to get/set last values

            assert submitted.isEmpty();
            if (lastWorkflowRunBeforeReplay!=null) {
                // if interrupted we need to explicitly take from the last step
                reducingV = updateReducingWorkflowVarsFromLastStep(lastWorkflowRunBeforeReplay, reducingV);
            }
            for (Pair<WorkflowExecutionContext, Task<?>> p : delayedBecauseReducing) {
                WorkflowExecutionContext wc = p.getLeft();
                if (wc.getCurrentStepIndex()==null || wc.getCurrentStepIndex()==WorkflowExecutionContext.STEP_INDEX_FOR_START) {
                    // initialize to last if it hasn't started
                    wc.updateWorkflowScratchVariables(reducingV);
                }

                DynamicTasks.queue(p.getRight()).getUnchecked();

                reducingV = updateReducingWorkflowVarsFromLastStep(wc, reducingV);
            }
        }

        Object returnValue;
        if (reducingV==null) {
            List result = MutableList.of();
            nestedWorkflowContexts.forEach(nw -> result.add(nw.getOutput()));
            if (!wasList && result.size() != 1) {
                throw new IllegalStateException("Result mismatch, non-list target " + target + " yielded output " + result);
            }
            context.setOutput(result);
            returnValue = !wasList ? Iterables.getOnlyElement(result) : result;
        } else {
            context.setOutput(reducingV);
            context.getWorkflowExectionContext().updateWorkflowScratchVariables(reducingV);
            returnValue = reducingV;
        }

        return returnValue;
    }