in flux-common/src/main/java/software/amazon/aws/clients/swf/flux/poller/ActivityExecutionUtil.java [34:74]
public static StepResult executeActivity(WorkflowStep step, String activityName, MetricRecorder fluxMetrics,
MetricRecorder stepMetrics, Map<String, String> input) {
StepResult result;
String retryExceptionCauseName = null;
try {
Method applyMethod = WorkflowStepUtil.getUniqueAnnotatedMethod(step.getClass(), StepApply.class);
Object returnObject = applyMethod.invoke(step, WorkflowStepUtil.generateArguments(step.getClass(), applyMethod,
stepMetrics, input));
// if apply didn't throw an exception, then we can assume success.
// However, if the apply method's return type is StepResult, then we need to respect it.
result = StepResult.success();
if (applyMethod.getReturnType() == StepResult.class && returnObject != null) {
result = (StepResult)returnObject;
}
} catch (InvocationTargetException e) { // ITE when invoke's target (the step) throws an exception
// All exceptions should result in a retry. We can get the activity's exception in the cause field of the ITE.
retryExceptionCauseName = e.getCause().getClass().getSimpleName();
result = StepResult.retry(e.getCause());
log.info("Step {} threw an exception ({}), returning a retry result.",
step.getClass().getSimpleName(), e.getCause().toString(),
e.getCause());
} catch (IllegalAccessException e) {
// IllegalAccessException shouldn't happen, since we only looked for public methods,
// but if it does happen we'll just let the workflow retry the step.
retryExceptionCauseName = e.getClass().getSimpleName();
result = StepResult.retry(e);
log.error("Got an exception ({}) attempting to execute step {}, returning a retry result.",
e.getMessage(), step.getClass().getSimpleName(), e);
}
// now figure out which fluxMetrics to emit
if (retryExceptionCauseName != null) {
fluxMetrics.addCount(formatRetryResultMetricName(activityName, retryExceptionCauseName), 1.0);
} else if (result.getAction() == StepResult.ResultAction.RETRY) {
fluxMetrics.addCount(formatRetryResultMetricName(activityName, null), 1.0);
} else {
fluxMetrics.addCount(formatCompletionResultMetricName(activityName, result.getResultCode()), 1.0);
}
return result;
}