in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/ActivityExecutor.java [85:166]
public void run() {
log.debug("Worker {} received activity task for activity {} id {}.", identity, task.activityType().name(),
task.activityId());
String workflowId = task.workflowExecution().workflowId();
String runId = task.workflowExecution().runId();
try (MetricRecorder stepMetrics = metricsFactory.newMetricRecorder(task.activityType().name());
MDC.MDCCloseable ignored = MDC.putCloseable(WORKFLOW_ID_METRIC_NAME, workflowId);
MDC.MDCCloseable ignored1 = MDC.putCloseable(WORKFLOW_RUN_ID_METRIC_NAME, runId)
) {
stepMetrics.addProperty(WORKFLOW_ID_METRIC_NAME, workflowId);
stepMetrics.addProperty(WORKFLOW_RUN_ID_METRIC_NAME, runId);
Map<String, String> input = StepAttributes.decode(Map.class, task.input());
List<WorkflowStepHook> hooks = workflow.getGraph().getHooksForStep(step.getClass());
Map<String, String> hookInput = new HashMap<>(input);
hookInput.put(StepAttributes.ACTIVITY_NAME, StepAttributes.encode(task.activityType().name()));
if (hooks != null && step.getClass() != PostWorkflowHookAnchor.class) {
result = WorkflowStepUtil.executeHooks(hooks, hookInput, StepHook.HookType.PRE, task.activityType().name(),
fluxMetrics, stepMetrics);
}
Map<String, String> outputAttributes = new HashMap<>(input);
if (result == null) {
result = ActivityExecutionUtil.executeActivity(step, task.activityType().name(), fluxMetrics, stepMetrics, input);
// yes, this means the output attributes are serialized into a map, which is itself serialized.
// this makes deserialization less confusing later because we can deserialize as a map of strings
// and then deserialize each value as a specific type.
Map<String, String> serializedResultAttributes = StepAttributes.serializeMapValues(result.getAttributes());
outputAttributes.putAll(serializedResultAttributes);
hookInput.putAll(serializedResultAttributes);
// retries put their reason message in the special ActivityTaskFailed reason field.
if (result.getAction() != StepResult.ResultAction.RETRY && result.getMessage() != null) {
outputAttributes.put(StepAttributes.ACTIVITY_COMPLETION_MESSAGE, result.getMessage());
hookInput.put(StepAttributes.ACTIVITY_COMPLETION_MESSAGE, StepAttributes.encode(result.getMessage()));
}
if (result.getResultCode() != null) {
outputAttributes.put(StepAttributes.RESULT_CODE, result.getResultCode());
hookInput.put(StepAttributes.RESULT_CODE, StepAttributes.encode(result.getResultCode()));
}
if (hooks != null && step.getClass() != PreWorkflowHookAnchor.class) {
StepResult hookResult = WorkflowStepUtil.executeHooks(hooks, hookInput, StepHook.HookType.POST,
task.activityType().name(), fluxMetrics, stepMetrics);
if (hookResult != null) {
log.info("Activity {} returned result {} ({}) but a post-step hook requires a retry ({}).",
task.activityType().name(), result.getResultCode(), result.getMessage(),
hookResult.getMessage());
result = hookResult;
outputAttributes.remove(StepAttributes.ACTIVITY_COMPLETION_MESSAGE);
outputAttributes.remove(StepAttributes.RESULT_CODE);
}
}
}
if (result != null && result.getAction() == StepResult.ResultAction.RETRY) {
// for retries, we'll check if the retry was caused by an exception and, if so,
// record the stack trace of the exception in the output.
if (result.getCause() != null) {
StringWriter sw = new StringWriter();
result.getCause().printStackTrace(new PrintWriter(sw));
output = sw.toString();
} else {
// otherwise, the output should be blank. It will be ignored anyway.
output = null;
}
} else {
output = StepAttributes.encode(outputAttributes);
}
} catch (RuntimeException e) {
// we've suppressed java.lang.Thread's default behavior (print to stdout), so we want the error logged.
log.error("Caught an exception while executing activity task", e);
throw e;
}
}