in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/DecisionTaskPoller.java [468:773]
private static List<Decision> handleStepScheduling(Workflow workflow, String workflowId, WorkflowStep nextStep,
WorkflowState state, Map<String, String> nextStepInput,
double exponentialBackoffBase, MetricRecorder fluxMetrics,
MetricRecorderFactory metricsFactory) throws JsonProcessingException {
List<Decision> decisions = new LinkedList<>();
String workflowName = TaskNaming.workflowName(workflow);
String nextStepName = TaskNaming.stepName(nextStep);
String nextActivityName = TaskNaming.activityName(workflowName, nextStepName);
boolean nextStepIsPartitioned = PartitionedWorkflowStep.class.isAssignableFrom(nextStep.getClass());
boolean generatePartitionMetadata = false;
PartitionMetadata nextStepPartitionMetadata = state.getPartitionMetadata(nextStepName);
// If we already have state for the step, we are probably retrying. Otherwise, it's a new step.
if (!state.getLatestPartitionStates(nextActivityName).isEmpty()) {
// Here we need to decide whether to generate partition metadata.
// First we need to check if we already have partition metadata (from the marker) for this partition.
// If so, we don't need to do anything here.
// Otherwise, we need to check if we have partial state. This can happen if this is a partitioned step and SWF throttled
// activity scheduling for some or all of the partitions.
if (nextStepIsPartitioned && nextStepPartitionMetadata == null) {
// Here we need to check each partition id to figure out two things:
// 1) Do we have non-empty state for at least one partition?
// 2) Do we have empty state for at least one partition?
// We'll only have empty state for a partition whose first attempt was never successfully scheduled.
boolean hasNonEmptyState = false;
boolean hasEmptyState = false;
for (PartitionState partition : state.getLatestPartitionStates(nextActivityName).values()) {
if (partition == null) {
hasEmptyState = true;
} else {
hasNonEmptyState = true;
}
}
// If we have empty state for all partitions, we can proceed to generate partition metadata.
if (hasEmptyState && !hasNonEmptyState) {
generatePartitionMetadata = true;
}
}
} else if (nextStepIsPartitioned) {
// In this case, it's a new partitioned step. If we don't already have partition metadata,
// we need to generate it.
if (nextStepPartitionMetadata == null) {
generatePartitionMetadata = true;
}
}
if (generatePartitionMetadata) {
// If we need to generate partition ids, then we need to return two decisions.
// First, we need to record a marker containing the partition metadata.
// Second, like when we cancel a retry timer, we need to force another decision to occur.
// We do them separately to reduce the likelihood that throttling of activity scheduling affects recording the marker,
// and (less importantly) because the marker will use up some of the 1MB of data we can send back in the decision
// response, which may reduce the number of partitions we could schedule.
PartitionIdGeneratorResult result
= WorkflowStepUtil.getPartitionIdsForPartitionedStep((PartitionedWorkflowStep)nextStep,
nextStepInput, workflowName,
workflowId, metricsFactory);
PartitionMetadata metadata = PartitionMetadata.fromPartitionIdGeneratorResult(result);
List<String> markerDetailsList = metadata.toMarkerDetailsList();
for (int i = 0; i < markerDetailsList.size(); i++) {
String markerDetails = markerDetailsList.get(i);
RecordMarkerDecisionAttributes markerAttrs = RecordMarkerDecisionAttributes.builder()
.markerName(TaskNaming.partitionMetadataMarkerName(nextStepName, i, markerDetailsList.size()))
.details(markerDetails)
.build();
Decision marker = Decision.builder().decisionType(DecisionType.RECORD_MARKER)
.recordMarkerDecisionAttributes(markerAttrs)
.build();
decisions.add(marker);
}
decisions.add(decisionToForceNewDecision());
// As noted above, we don't want to schedule the partitions at the same time, so we'll return immediately.
return decisions;
}
// Now we need to extract the set of partition IDs.
Set<String> partitionIds;
if (nextStepIsPartitioned) {
if (nextStepPartitionMetadata != null) {
partitionIds = nextStepPartitionMetadata.getPartitionIds();
} else {
// In this case, we don't have partition metadata but we didn't decide to generate partition metadata.
// We'll need to extract the partition IDs from the set of partitions we have state for.
// This should only happen during deployment of this change.
partitionIds = state.getLatestPartitionStates(nextActivityName).keySet();
}
} else {
partitionIds = Collections.singleton(null);
}
// For partitioned steps, there are a few edge cases to handle.
//
// 1) If we have a partition metadata marker, we're not in the edge case.
// 2) If all partitions had at least one successfully-scheduled attempt, then we proceed without the marker.
// This handles backward-compatibility during deployment of this change in the absence of throttling.
// 3) If SWF has rejected the first attempt to schedule at least one partition, and zero partitions
// have been successfully scheduled, we can proceed to generate the metadata marker.
// This handles backward-compatibility during deployment of this change if throttling is occurring to all partitions.
// This case is checked for above, so if we get to this point in the code we actually hit case 1.
// 4) If SWF has rejected the first attempt to schedule at least one partition, but has successfully scheduled
// at least one attempt for at least one other partition, then we can proceed without the marker,
// but we need to extract any additional attributes from the input to one of the successfully-scheduled partitions.
//
// We could just abandon the workflow if we hit case 4, but it is better to allow users to force the workflow
// into a rollback or recovery path in the workflow graph if possible, rather than requiring them to terminate
// the workflow in all cases.
// If we have partition metadata, we should put its additional attributes into the next step input.
if (nextStepPartitionMetadata != null) {
nextStepInput.putAll(nextStepPartitionMetadata.getEncodedAdditionalAttributes());
} else if (nextStepIsPartitioned) {
// If we get here, we need to extract the additional attributes from one of the successfully-scheduled partitions.
// Note that we can't get here if there aren't any successfully-scheduled partitions, since we would have
// generated partition metadata and returned from this method earlier.
// In other words, if we got here, then we don't have a partition metadata marker,
// and there was at least one successfully-scheduled partition, which means the .orElse() here can't trigger.
PartitionState lastState = state.getLatestPartitionStates(nextActivityName)
.values().stream().filter(Objects::nonNull).findFirst().orElse(null);
for (Map.Entry<String, String> e : lastState.getAttemptInput().entrySet()) {
nextStepInput.putIfAbsent(e.getKey(), e.getValue());
}
// We'll need to remove the partition-specific inputs
nextStepInput.remove(StepAttributes.PARTITION_ID);
nextStepInput.remove(StepAttributes.RETRY_ATTEMPT);
}
for (String partitionId : partitionIds) {
PartitionState lastAttempt = state.getLatestPartitionStates(nextActivityName).get(partitionId);
if (lastAttempt != null && lastAttempt.getResultCode() != null) {
// Non-partitioned steps should not get here.
if (!nextStepIsPartitioned) {
String msg = String.format("We cannot reschedule a non-partitioned step %s.%s that already has a result %s.",
workflowName, nextActivityName, lastAttempt.getResultCode());
log.error(msg);
throw new BadWorkflowStateException(msg);
}
log.info("Workflow {} step {}.{} partition {} already completed ({}), not rescheduling.",
workflowId, workflowName, nextActivityName, partitionId, lastAttempt.getResultCode());
continue;
}
boolean retrying = false;
long attemptNumber = 0L;
if (lastAttempt != null && lastAttempt.getAttemptResult() == null) {
// The last attempt was already scheduled (possibly even started) but hasn't finished yet.
// Just move on to the next partition.
continue;
} else if (lastAttempt != null) {
// This is a retry.
// We need to check for a timer for this particular retry.
String encoded = lastAttempt.getAttemptInput().get(StepAttributes.RETRY_ATTEMPT);
if (encoded != null) {
attemptNumber = StepAttributes.decode(Long.class, encoded);
}
// We're going to schedule the next attempt, so we need to bump the attempt number.
attemptNumber += 1L;
retrying = true;
} else { // lastAttempt == null
// We can get here if we tried to schedule the first attempt of a step
// but SWF gave us a ScheduleActivityTaskFailed event.
// In this case we need to try again.
retrying = true;
}
String activityId = TaskNaming.createActivityId(nextStep, attemptNumber, partitionId);
boolean hasForceResultSignal = false;
if (state.getSignalsByActivityId().containsKey(activityId)
&& state.getSignalsByActivityId().get(activityId).getSignalType() == SignalType.FORCE_RESULT) {
hasForceResultSignal = true;
retrying = false;
}
// Now we need to make a decision for this attempt.
// If it's not the first attempt, we need to check timers for this activityId.
// - If a timer is open, there's nothing to do yet unless we got a RetryNow or DelayRetry signal,
// or the workflow was canceled.
// - If no timer is open and no timer has fired, add a StartTimer decision.
// - Otherwise, schedule the next attempt.
if (attemptNumber > 0) {
// this is a retry if we get in here.
BaseSignalData signal = state.getSignalsByActivityId().get(activityId);
if (state.getOpenTimers().containsKey(activityId)) {
if (signal != null) {
// The retry timer id matches the next attempt's activity id, which matches the signal's activity id.
// ScheduleDelayedRetry events need to be ignored if the timer is still open.
// first check if the signal is older than the scheduled event for the retry timer, if so ignore it.
TimerData openTimer = state.getOpenTimers().get(activityId);
if (signal.getSignalEventId() > openTimer.getStartTimerEventId()) {
decisions.addAll(handleSignal(state, signal, partitionId, nextActivityName, fluxMetrics));
}
// If the signal was not ForceResult, we don't want to make any more decisions for this partition.
// If it was ForceResult, we may or may not need to schedule the next step, we need to let that code decide.
if (signal.getSignalType() != SignalType.FORCE_RESULT) {
continue;
}
} else {
log.debug("Processed a decision for workflow {} activity {} but there was still an open timer,"
+ " no action taken.", workflowId, activityId);
// continue to the next partition, we don't want to make any more decisions for this one
continue;
}
} else if (signal != null) { // if there isn't an open timer but we have a ScheduleDelayedRetry signal
if (signal.getSignalType() == SignalType.SCHEDULE_DELAYED_RETRY) {
// first check if the signal is older than the close event for the last retry timer, if so ignore it.
if (!state.getClosedTimers().containsKey(activityId)
|| signal.getSignalEventId() > state.getClosedTimers().get(activityId)) {
decisions.addAll(handleSignal(state, signal, partitionId, nextActivityName, fluxMetrics));
// continue to the next partition, we don't want to make any more decisions for this one
continue;
}
} else {
log.debug("Processed a decision for workflow {} activity {} but there was no open timer,"
+ " no action taken.", workflowId, activityId);
// no continue here, we may want to schedule a normal retry timer, or the next step.
}
}
// check whether we've already closed the retry timer. If not, start it.
if (!state.getClosedTimers().containsKey(activityId) && !state.getOpenTimers().containsKey(activityId)) {
long delayInSeconds = RetryUtils.calculateRetryBackoffInSeconds(nextStep, attemptNumber,
exponentialBackoffBase);
StartTimerDecisionAttributes attrs = buildStartTimerDecisionAttrs(activityId, delayInSeconds, partitionId);
Decision decision = Decision.builder().decisionType(DecisionType.START_TIMER)
.startTimerDecisionAttributes(attrs)
.build();
log.debug("Workflow {} will have activity {} scheduled after a delay of {} seconds.",
workflowId, activityId, delayInSeconds);
decisions.add(decision);
// continue to the next partition, we don't want to make any more decisions for this one
continue;
}
} else if (!retrying && state.getCurrentActivityName() != null) {
// We're about to schedule the first attempt of the next step.
// We need to check if the previous step ended due to a ForceResult signal; if so,
// we may need to cancel its retry timer. We don't really know which partition it might have been,
// so it's easiest to just check for open timers for any partition of the previous step and cancel them.
Map<String, PartitionState> prevStep = state.getLatestPartitionStates(state.getCurrentActivityName());
for (Map.Entry<String, PartitionState> prevPartition : prevStep.entrySet()) {
PartitionState prevState = prevPartition.getValue();
String currentStepName = TaskNaming.stepNameFromActivityName(state.getCurrentActivityName());
String prevActivityId = TaskNaming.createActivityId(currentStepName, prevState.getRetryAttempt() + 1,
prevPartition.getKey());
if (state.getOpenTimers().containsKey(prevActivityId)) {
decisions.add(buildCancelTimerDecision(prevActivityId));
}
}
}
// if we got a ForceResult signal for this partition attempt, we've probably just cancelled the timer,
// so we don't want to schedule a retry for it.
if (hasForceResultSignal) {
continue;
}
// If we get this far, we know we're scheduling a run of nextStep.
// First let's populate the attempt-specific fields into the next step input map.
Map<String, String> actualInput = new TreeMap<>(nextStepInput);
if (attemptNumber > 0L) {
actualInput.put(StepAttributes.RETRY_ATTEMPT, Long.toString(attemptNumber));
}
if (partitionId != null) {
actualInput.put(StepAttributes.PARTITION_ID, StepAttributes.encode(partitionId));
actualInput.put(StepAttributes.PARTITION_COUNT, Long.toString(partitionIds.size()));
}
Instant firstAttemptDate = state.getStepInitialAttemptTime(nextActivityName);
if (firstAttemptDate == null) {
firstAttemptDate = Instant.now();
}
actualInput.put(StepAttributes.ACTIVITY_INITIAL_ATTEMPT_TIME, StepAttributes.encode(firstAttemptDate));
ScheduleActivityTaskDecisionAttributes attrs
= buildScheduleActivityTaskDecisionAttrs(workflow, nextStep, actualInput, activityId);
// We'll save the partition id in the control field for convenience in debugging and testing,
// and to reference when rebuilding partition state to avoid having to inspect the step's input attributes.
attrs = attrs.toBuilder().control(partitionId).build();
Decision decision = Decision.builder().decisionType(DecisionType.SCHEDULE_ACTIVITY_TASK)
.scheduleActivityTaskDecisionAttributes(attrs)
.build();
decisions.add(decision);
log.debug("Workflow {} will have activity {} scheduled for execution.", workflowId, activityId);
}
return decisions;
}