public WorkflowGraphBuilder addStep()

in flux/src/main/java/software/amazon/aws/clients/swf/flux/wf/graph/WorkflowGraphBuilder.java [103:192]


    public WorkflowGraphBuilder addStep(WorkflowStep step) {
        if (step == null) {
            throw new WorkflowGraphBuildException("Cannot add a null step.");
        }

        if (CloseWorkflow.class.isAssignableFrom(step.getClass())) {
            throw new WorkflowGraphBuildException("CloseWorkflow must not be implemented.");
        }

        if (stepImpls.containsKey(step.getClass())) {
            throw new WorkflowGraphBuildException("WorkflowStep classes can only be used once per workflow.");
        }

        Method applyMethod;
        try {
            applyMethod = WorkflowStepUtil.getUniqueAnnotatedMethod(step.getClass(), StepApply.class);
        } catch (Exception e) {
            throw new WorkflowGraphBuildException(e.getMessage(), e);
        }

        boolean requestsPartitionId = false;
        boolean requestsPartitionCount = false;

        for (Parameter param : applyMethod.getParameters()) {
            if (param.getType().isAssignableFrom(MetricRecorder.class)) {
                continue;
            }

            Attribute attr = validateAttributeAnnotationPresent(step, applyMethod, param);
            if (StepAttributes.PARTITION_ID.equals(attr.value())) {
                requestsPartitionId = true;
                if (!String.class.equals(param.getType())) {
                    throw new WorkflowGraphBuildException("The partition id attribute must be of type String.");
                }
            } else if (StepAttributes.PARTITION_COUNT.equals(attr.value())) {
                if (!Long.class.equals(param.getType())) {
                    throw new WorkflowGraphBuildException("The partition count attribute must be of type Long.");
                }
                requestsPartitionCount = true;
            } else {
                StepAttributes.validateAttributeClass(param.getType());
            }
        }

        if (PartitionedWorkflowStep.class.isAssignableFrom(step.getClass())) {
            if (!requestsPartitionId) {
                String message = String.format("Partitioned workflow step %s must have a parameter for the partition id.",
                                               step.getClass().getSimpleName());
                throw new WorkflowGraphBuildException(message);
            }

            // Ensure that the return type of the method is List<> or PartitionIdGeneratorResult.
            try {
                Method partitionIdGeneratorMethod = WorkflowStepUtil.getUniqueAnnotatedMethod(step.getClass(),
                                                                                              PartitionIdGenerator.class);
                if (!List.class.equals(partitionIdGeneratorMethod.getReturnType())
                        && !PartitionIdGeneratorResult.class.equals(partitionIdGeneratorMethod.getReturnType())) {
                    throw new WorkflowGraphBuildException(String.format("%s.%s must have a return type of List<String>"
                                                                        + " or PartitionIdGeneratorResult.",
                                                                        step.getClass().getSimpleName(),
                                                                        partitionIdGeneratorMethod.getName()));
                }

                for (Parameter param : partitionIdGeneratorMethod.getParameters()) {
                    if (param.getType().isAssignableFrom(MetricRecorder.class)) {
                        continue;
                    }

                    Attribute attr = validateAttributeAnnotationPresent(step, partitionIdGeneratorMethod, param);
                    if (StepAttributes.PARTITION_ID.equals(attr.value()) || StepAttributes.PARTITION_COUNT.equals(attr.value())) {
                        String msg = String.format("%s.%s cannot request the PARTITION_ID or PARTITION_COUNT attributes.",
                                                   step.getClass().getSimpleName(), partitionIdGeneratorMethod.getName());
                        throw new WorkflowGraphBuildException(msg);
                    } else {
                        StepAttributes.validateAttributeClass(param.getType());
                    }
                }
            } catch (Exception e) {
                throw new WorkflowGraphBuildException(e.getMessage(), e);
            }
        } else if (requestsPartitionId || requestsPartitionCount) { // step is not partitioned
            String message = String.format("Workflow step %s is not partitioned but requested partition id or partition count.",
                                           step.getClass().getSimpleName());
            throw new WorkflowGraphBuildException(message);
        }

        stepImpls.put(step.getClass(), step);

        return this;
    }