protected Object doTaskBody()

in core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/RetryWorkflowStep.java [244:361]


    protected Object doTaskBody(WorkflowStepInstanceExecutionContext context) {

        String hash = Strings.firstNonBlank(context.getInput(HASH), context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()));
        List<Instant> retries = context.getWorkflowExectionContext().getRetryRecords().compute(hash, (k, v) -> v != null ? v : MutableList.of());

        List<RetryLimit> limit = context.getInput(LIMIT);
        if (limit!=null) {
            limit.forEach(l -> {
                Maybe<String> reachedMessage = l.isReached(retries);
                if (reachedMessage.isPresent()) throw new RetriesExceeded(reachedMessage.get(), context.getError());
            });
        }

        Duration t = getMaximumRetryTimeout();
        if (t!=null) {
            Instant oldest = retries.stream().min((i1, i2) -> i1.compareTo(i2)).orElse(null);
            if (oldest != null) {
                Duration sinceFirst = Duration.between(oldest, Instant.now());
                if (sinceFirst.isLongerThan(t)) {
                    throw Exceptions.propagate(new TimeoutException("Workflow duration of "+sinceFirst+" exceeds timeout of "+t).initCause(context.getError()));
                }
            }
        }

        RetryBackoff backoff = context.getInput(BACKOFF);
        if (backoff!=null) {
            Duration delay;
            int exponent = 0;
            if (backoff.initial!=null && !backoff.initial.isEmpty()) {
                if (backoff.initial.size() > retries.size()) {
                    delay = backoff.initial.get(retries.size());
                } else {
                    delay = backoff.initial.get(backoff.initial.size()-1);
                    exponent = 1 + retries.size() - backoff.initial.size();
                }
            } else {
                // shouldn't be possible
                delay = Duration.ZERO;
            }
            if (backoff.factor!=null) while (exponent-- > 0) delay = delay.multiply(backoff.factor);
            if (backoff.increase !=null) delay = delay.add(backoff.increase.multiply(exponent));

            if (backoff.jitter!=null) delay = delay.multiply(1 + Math.random()*backoff.jitter);
            if (backoff.max!=null && delay.isLongerThan(backoff.max)) {
                delay = backoff.max;
                // also apply a sigmoidal heuristic if jitter requested
                if (backoff.jitter!=null) delay = delay.multiply(1 / (1+Math.random()*backoff.jitter));
            }

            if (delay.isPositive()) {
                Duration ddelay = delay;
                try {
                    Tasks.withBlockingDetails("Waiting " + delay + " before retry #" + (retries.size() + 1), () -> {
                        log.debug("Waiting " + ddelay + " before retry #" + (retries.size() + 1));
                        Time.sleep(ddelay);
                        return null;
                    });
                } catch (Exception e) {
                    throw Exceptions.propagate(e);
                }
            }
        }

        retries.add(Instant.now());
        context.getWorkflowExectionContext().getRetryRecords().put(hash, retries);

        boolean inErrorHandler = !context.equals(context.getWorkflowExectionContext().getCurrentStepInstance());
        RetryReplayOption replay = context.getInput(REPLAY);
        String next = this.next;
        if (replay==null) {
            replay = next==null ? RetryReplayOption.TRUE : RetryReplayOption.FALSE;
            if (next==null) next = inErrorHandler ? STEP_TARGET_NAME_FOR_END : STEP_TARGET_NAME_FOR_LAST;
        } else if (next==null) {
            next = STEP_TARGET_NAME_FOR_LAST;
        }

        if (replay!=RetryReplayOption.FALSE) {
            context.next = null;
            if (STEP_TARGET_NAME_FOR_END.equals(next)) {
                if (!inErrorHandler) {
                    log.warn("Retry target `"+STEP_TARGET_NAME_FOR_END+"` is only permitted inside an error handler; using `"+STEP_TARGET_NAME_FOR_LAST+"` instead");
                    next = STEP_TARGET_NAME_FOR_LAST;
                } else {
                    context.next = context.getWorkflowExectionContext().factory(true).makeInstructionsForReplayResuming(
                            "Retry replay from '" + next + "' per step " + context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()), replay == RetryReplayOption.FORCE);
                }
            }
            if (context.next==null) {
                if (STEP_TARGET_NAME_FOR_LAST.equals(next)) {
                    context.next = null;
                    int lastReplayStep = context.getWorkflowExectionContext().getReplayableLastStep() != null ? context.getWorkflowExectionContext().getReplayableLastStep() : WorkflowExecutionContext.STEP_INDEX_FOR_START;
                    if (!inErrorHandler) {
                        if (context.getStepIndex() == lastReplayStep) {
                            // can't replay from retry step
                            lastReplayStep = WorkflowReplayUtils.findNearestReplayPoint(context.getWorkflowExectionContext(), lastReplayStep, false);
                        }
                    }
                    context.next = context.getWorkflowExectionContext().factory(true).makeInstructionsForReplayingFromStep(lastReplayStep,
                            "Retry replay per step " + context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()), replay == RetryReplayOption.FORCE);
                    // could offer retry resuming but that is often not wanted; instead do that if `next` is `end`

                } else {
                    context.next = context.getWorkflowExectionContext().factory(true).makeInstructionsForReplayingFromStep(context.getWorkflowExectionContext().getIndexOfStepId(next).get().getLeft(),
                            "Retry replay from '" + next + "' per step " + context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()), replay == RetryReplayOption.FORCE);
                }
            }
            log.debug("Retrying with "+context.next);
        } else {
            if (next==null) {
                throw new IllegalStateException("Cannot retry with replay disabled and no specified next");
            } else {
                // will go to next by id
                context.next = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, next);
                log.debug("Retrying from explicit next step '"+context.next+"'");
            }
        }
        return context.getPreviousStepOutput();
    }