in flux/src/main/java/software/amazon/aws/clients/swf/flux/wf/graph/WorkflowGraphBuilder.java [562:646]
private void validateAttributeAvailability(WorkflowStep step,
Map<Class<? extends WorkflowStep>, WorkflowGraphNode> nodes,
Map<String, Class<?>> availableAttributes,
Map<Class<? extends WorkflowStep>, List<WorkflowStepHook>> stepHooks) {
boolean stepIsPartitioned = PartitionedWorkflowStep.class.isAssignableFrom(step.getClass());
if (stepIsPartitioned) {
Method partitionIdGenerator = WorkflowStepUtil.getUniqueAnnotatedMethod(step.getClass(), PartitionIdGenerator.class);
for (Parameter param : partitionIdGenerator.getParameters()) {
if (param.getType().isAssignableFrom(MetricRecorder.class)) {
continue;
}
Attribute attr = param.getAnnotation(Attribute.class);
if (StepAttributes.PARTITION_ID.equals(attr.value()) || StepAttributes.PARTITION_COUNT.equals(attr.value())) {
String msg = String.format("%s.%s cannot request the PARTITION_ID or PARTITION_COUNT attributes.",
step.getClass().getSimpleName(), partitionIdGenerator.getName());
throw new WorkflowGraphBuildException(msg);
}
Class<?> paramType = param.getType();
if (paramType == Date.class) {
// The step attribute encoding/decoding logic allows Date and Instant to be used
// interchangeably, so for the purposes of attribute availability,
// we'll coerce all Dates to Instants, to simplify the validation logic.
paramType = Instant.class;
}
validateAttributeIdAndType(step.getClass().getSimpleName(), partitionIdGenerator.getName(), attr.value(),
paramType, attr.optional(), availableAttributes);
}
addDeclaredOutputAttributes(availableAttributes, step);
}
Map<String, Class<?>> withSpecialAttributes = new HashMap<>(availableAttributes);
withSpecialAttributes.put(StepAttributes.ACTIVITY_INITIAL_ATTEMPT_TIME,
StepAttributes.getSpecialAttributeType(StepAttributes.ACTIVITY_INITIAL_ATTEMPT_TIME));
withSpecialAttributes.put(StepAttributes.RETRY_ATTEMPT,
StepAttributes.getSpecialAttributeType(StepAttributes.RETRY_ATTEMPT));
if (stepIsPartitioned) {
withSpecialAttributes.put(StepAttributes.PARTITION_ID,
StepAttributes.getSpecialAttributeType(StepAttributes.PARTITION_ID));
withSpecialAttributes.put(StepAttributes.PARTITION_COUNT,
StepAttributes.getSpecialAttributeType(StepAttributes.PARTITION_COUNT));
}
Method applyMethod = WorkflowStepUtil.getUniqueAnnotatedMethod(step.getClass(), StepApply.class);
for (Parameter param : applyMethod.getParameters()) {
Class<?> paramType = param.getType();
if (paramType.isAssignableFrom(MetricRecorder.class)) {
continue;
}
if (paramType == Date.class) {
// The step attribute encoding/decoding logic allows Date and Instant to be used
// interchangeably, so for the purposes of attribute availability,
// we'll coerce all Dates to Instants, to simplify the validation logic.
paramType = Instant.class;
}
Attribute attr = param.getAnnotation(Attribute.class);
validateAttributeIdAndType(step.getClass().getSimpleName(), applyMethod.getName(), attr.value(), paramType,
attr.optional(), withSpecialAttributes);
}
Map<String, Class<?>> nextStepAttributes = new HashMap<>(availableAttributes);
if (!stepIsPartitioned) {
addDeclaredOutputAttributes(nextStepAttributes, step);
}
if (stepHooks.containsKey(step.getClass())) {
for (WorkflowStepHook hook : stepHooks.get(step.getClass())) {
validateHookAttributeAvailability(step, hook, StepHook.HookType.PRE, availableAttributes, stepIsPartitioned);
validateHookAttributeAvailability(step, hook, StepHook.HookType.POST, nextStepAttributes, stepIsPartitioned);
}
}
for (WorkflowGraphNode node : nodes.get(step.getClass()).getNextStepsByResultCode().values()) {
if (node == null) {
// this means the workflow ends for that result code, move on to the next.
continue;
}
validateAttributeAvailability(node.getStep(), nodes, nextStepAttributes, stepHooks);
}
}