in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/DecisionTaskPoller.java [775:839]
private static List<Decision> handleSignal(WorkflowState state, BaseSignalData signal,
String partitionId, String activityName, MetricRecorder metrics)
throws JsonProcessingException {
List<Decision> decisions = new LinkedList<>();
switch (signal.getSignalType()) {
case DELAY_RETRY:
// cancel the timer...
decisions.add(buildCancelTimerDecision(signal.getActivityId()));
// ... then schedule it to be recreated with the specified delay.
SignalExternalWorkflowExecutionDecisionAttributes signalAttrs
= buildScheduleRetrySignalAttrs(state, (DelayRetrySignalData)signal);
Decision scheduleRetry = Decision.builder().decisionType(DecisionType.SIGNAL_EXTERNAL_WORKFLOW_EXECUTION)
.signalExternalWorkflowExecutionDecisionAttributes(signalAttrs)
.build();
decisions.add(scheduleRetry);
log.debug("Signaling for activity {} to be re-scheduled due to a {} signal.",
signal.getActivityId(), signal.getSignalType().getFriendlyName());
metrics.addCount(formatSignalProcessedForActivityMetricName(activityName, signal.getSignalType()), 1);
break;
case SCHEDULE_DELAYED_RETRY:
if (!state.getOpenTimers().containsKey(signal.getActivityId())) {
int delay = ((ScheduleDelayedRetrySignalData) signal).getDelayInSeconds();
// then re-create it with the new delay.
StartTimerDecisionAttributes timerAttrs = buildStartTimerDecisionAttrs(signal.getActivityId(), delay,
partitionId);
Decision startTimer = Decision.builder().decisionType(DecisionType.START_TIMER)
.startTimerDecisionAttributes(timerAttrs)
.build();
decisions.add(startTimer);
log.debug("Setting retry timer for activity {} due to a {} signal, with new delay of {} second{}",
signal.getActivityId(), signal.getSignalType().getFriendlyName(), delay,
(delay != 1 ? "s." : "."));
metrics.addCount(formatSignalProcessedForActivityMetricName(activityName, signal.getSignalType()), 1);
} else {
log.debug("Ignoring signal {} because the timer is still open.", signal.getSignalType().getFriendlyName());
}
break;
case RETRY_NOW:
decisions.add(buildCancelTimerDecision(signal.getActivityId()));
// Simply canceling the timer won't cause a new decision task to be scheduled, meaning the step retry would never
// be scheduled; to solve this, we need to force a new decision task.
decisions.add(decisionToForceNewDecision());
log.debug("Immediately retrying activity {} due to {} signal.",
signal.getActivityId(), signal.getSignalType().getFriendlyName());
metrics.addCount(formatSignalProcessedForActivityMetricName(activityName, signal.getSignalType()), 1);
break;
case FORCE_RESULT:
if (state.getOpenTimers().containsKey(signal.getActivityId())) {
decisions.add(buildCancelTimerDecision(signal.getActivityId()));
}
break;
default:
log.warn("Ignoring signal with unknown type: {}", signal.getSignalType());
break;
}
return decisions;
}