in core/src/main/java/org/apache/brooklyn/core/workflow/steps/flow/RetryWorkflowStep.java [244:361]
protected Object doTaskBody(WorkflowStepInstanceExecutionContext context) {
String hash = Strings.firstNonBlank(context.getInput(HASH), context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()));
List<Instant> retries = context.getWorkflowExectionContext().getRetryRecords().compute(hash, (k, v) -> v != null ? v : MutableList.of());
List<RetryLimit> limit = context.getInput(LIMIT);
if (limit!=null) {
limit.forEach(l -> {
Maybe<String> reachedMessage = l.isReached(retries);
if (reachedMessage.isPresent()) throw new RetriesExceeded(reachedMessage.get(), context.getError());
});
}
Duration t = getMaximumRetryTimeout();
if (t!=null) {
Instant oldest = retries.stream().min((i1, i2) -> i1.compareTo(i2)).orElse(null);
if (oldest != null) {
Duration sinceFirst = Duration.between(oldest, Instant.now());
if (sinceFirst.isLongerThan(t)) {
throw Exceptions.propagate(new TimeoutException("Workflow duration of "+sinceFirst+" exceeds timeout of "+t).initCause(context.getError()));
}
}
}
RetryBackoff backoff = context.getInput(BACKOFF);
if (backoff!=null) {
Duration delay;
int exponent = 0;
if (backoff.initial!=null && !backoff.initial.isEmpty()) {
if (backoff.initial.size() > retries.size()) {
delay = backoff.initial.get(retries.size());
} else {
delay = backoff.initial.get(backoff.initial.size()-1);
exponent = 1 + retries.size() - backoff.initial.size();
}
} else {
// shouldn't be possible
delay = Duration.ZERO;
}
if (backoff.factor!=null) while (exponent-- > 0) delay = delay.multiply(backoff.factor);
if (backoff.increase !=null) delay = delay.add(backoff.increase.multiply(exponent));
if (backoff.jitter!=null) delay = delay.multiply(1 + Math.random()*backoff.jitter);
if (backoff.max!=null && delay.isLongerThan(backoff.max)) {
delay = backoff.max;
// also apply a sigmoidal heuristic if jitter requested
if (backoff.jitter!=null) delay = delay.multiply(1 / (1+Math.random()*backoff.jitter));
}
if (delay.isPositive()) {
Duration ddelay = delay;
try {
Tasks.withBlockingDetails("Waiting " + delay + " before retry #" + (retries.size() + 1), () -> {
log.debug("Waiting " + ddelay + " before retry #" + (retries.size() + 1));
Time.sleep(ddelay);
return null;
});
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
}
retries.add(Instant.now());
context.getWorkflowExectionContext().getRetryRecords().put(hash, retries);
boolean inErrorHandler = !context.equals(context.getWorkflowExectionContext().getCurrentStepInstance());
RetryReplayOption replay = context.getInput(REPLAY);
String next = this.next;
if (replay==null) {
replay = next==null ? RetryReplayOption.TRUE : RetryReplayOption.FALSE;
if (next==null) next = inErrorHandler ? STEP_TARGET_NAME_FOR_END : STEP_TARGET_NAME_FOR_LAST;
} else if (next==null) {
next = STEP_TARGET_NAME_FOR_LAST;
}
if (replay!=RetryReplayOption.FALSE) {
context.next = null;
if (STEP_TARGET_NAME_FOR_END.equals(next)) {
if (!inErrorHandler) {
log.warn("Retry target `"+STEP_TARGET_NAME_FOR_END+"` is only permitted inside an error handler; using `"+STEP_TARGET_NAME_FOR_LAST+"` instead");
next = STEP_TARGET_NAME_FOR_LAST;
} else {
context.next = context.getWorkflowExectionContext().factory(true).makeInstructionsForReplayResuming(
"Retry replay from '" + next + "' per step " + context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()), replay == RetryReplayOption.FORCE);
}
}
if (context.next==null) {
if (STEP_TARGET_NAME_FOR_LAST.equals(next)) {
context.next = null;
int lastReplayStep = context.getWorkflowExectionContext().getReplayableLastStep() != null ? context.getWorkflowExectionContext().getReplayableLastStep() : WorkflowExecutionContext.STEP_INDEX_FOR_START;
if (!inErrorHandler) {
if (context.getStepIndex() == lastReplayStep) {
// can't replay from retry step
lastReplayStep = WorkflowReplayUtils.findNearestReplayPoint(context.getWorkflowExectionContext(), lastReplayStep, false);
}
}
context.next = context.getWorkflowExectionContext().factory(true).makeInstructionsForReplayingFromStep(lastReplayStep,
"Retry replay per step " + context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()), replay == RetryReplayOption.FORCE);
// could offer retry resuming but that is often not wanted; instead do that if `next` is `end`
} else {
context.next = context.getWorkflowExectionContext().factory(true).makeInstructionsForReplayingFromStep(context.getWorkflowExectionContext().getIndexOfStepId(next).get().getLeft(),
"Retry replay from '" + next + "' per step " + context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()), replay == RetryReplayOption.FORCE);
}
}
log.debug("Retrying with "+context.next);
} else {
if (next==null) {
throw new IllegalStateException("Cannot retry with replay disabled and no specified next");
} else {
// will go to next by id
context.next = context.resolve(WorkflowExpressionResolution.WorkflowExpressionStage.STEP_RUNNING, next);
log.debug("Retrying from explicit next step '"+context.next+"'");
}
}
return context.getPreviousStepOutput();
}