in flux/src/main/java/software/amazon/aws/clients/swf/flux/wf/graph/WorkflowGraphBuilder.java [103:192]
public WorkflowGraphBuilder addStep(WorkflowStep step) {
if (step == null) {
throw new WorkflowGraphBuildException("Cannot add a null step.");
}
if (CloseWorkflow.class.isAssignableFrom(step.getClass())) {
throw new WorkflowGraphBuildException("CloseWorkflow must not be implemented.");
}
if (stepImpls.containsKey(step.getClass())) {
throw new WorkflowGraphBuildException("WorkflowStep classes can only be used once per workflow.");
}
Method applyMethod;
try {
applyMethod = WorkflowStepUtil.getUniqueAnnotatedMethod(step.getClass(), StepApply.class);
} catch (Exception e) {
throw new WorkflowGraphBuildException(e.getMessage(), e);
}
boolean requestsPartitionId = false;
boolean requestsPartitionCount = false;
for (Parameter param : applyMethod.getParameters()) {
if (param.getType().isAssignableFrom(MetricRecorder.class)) {
continue;
}
Attribute attr = validateAttributeAnnotationPresent(step, applyMethod, param);
if (StepAttributes.PARTITION_ID.equals(attr.value())) {
requestsPartitionId = true;
if (!String.class.equals(param.getType())) {
throw new WorkflowGraphBuildException("The partition id attribute must be of type String.");
}
} else if (StepAttributes.PARTITION_COUNT.equals(attr.value())) {
if (!Long.class.equals(param.getType())) {
throw new WorkflowGraphBuildException("The partition count attribute must be of type Long.");
}
requestsPartitionCount = true;
} else {
StepAttributes.validateAttributeClass(param.getType());
}
}
if (PartitionedWorkflowStep.class.isAssignableFrom(step.getClass())) {
if (!requestsPartitionId) {
String message = String.format("Partitioned workflow step %s must have a parameter for the partition id.",
step.getClass().getSimpleName());
throw new WorkflowGraphBuildException(message);
}
// Ensure that the return type of the method is List<> or PartitionIdGeneratorResult.
try {
Method partitionIdGeneratorMethod = WorkflowStepUtil.getUniqueAnnotatedMethod(step.getClass(),
PartitionIdGenerator.class);
if (!List.class.equals(partitionIdGeneratorMethod.getReturnType())
&& !PartitionIdGeneratorResult.class.equals(partitionIdGeneratorMethod.getReturnType())) {
throw new WorkflowGraphBuildException(String.format("%s.%s must have a return type of List<String>"
+ " or PartitionIdGeneratorResult.",
step.getClass().getSimpleName(),
partitionIdGeneratorMethod.getName()));
}
for (Parameter param : partitionIdGeneratorMethod.getParameters()) {
if (param.getType().isAssignableFrom(MetricRecorder.class)) {
continue;
}
Attribute attr = validateAttributeAnnotationPresent(step, partitionIdGeneratorMethod, param);
if (StepAttributes.PARTITION_ID.equals(attr.value()) || StepAttributes.PARTITION_COUNT.equals(attr.value())) {
String msg = String.format("%s.%s cannot request the PARTITION_ID or PARTITION_COUNT attributes.",
step.getClass().getSimpleName(), partitionIdGeneratorMethod.getName());
throw new WorkflowGraphBuildException(msg);
} else {
StepAttributes.validateAttributeClass(param.getType());
}
}
} catch (Exception e) {
throw new WorkflowGraphBuildException(e.getMessage(), e);
}
} else if (requestsPartitionId || requestsPartitionCount) { // step is not partitioned
String message = String.format("Workflow step %s is not partitioned but requested partition id or partition count.",
step.getClass().getSimpleName());
throw new WorkflowGraphBuildException(message);
}
stepImpls.put(step.getClass(), step);
return this;
}