in src/main/java/com/amazonaws/services/simpleworkflow/flow/worker/ActivityTaskPoller.java [185:272]
public void execute(ActivityTask task) throws Exception {
String output = null;
ActivityType activityType = task.getActivityType();
final Metrics metrics = metricsRegistry.newMetrics(MetricName.Operation.EXECUTE_ACTIVITY_TASK.getName());
MetricHelper.recordMetrics(task, metrics);
final Metrics childMetrics = metrics.newMetrics();
boolean activityResultSubmitted = false;
ActivityExecutionContext context = new ActivityExecutionContextImpl(service, domain, task, config, metricsRegistry);
ActivityTypeExecutionOptions executionOptions = null;
try {
try {
ActivityImplementation activityImplementation = activityImplementationFactory.getActivityImplementation(activityType);
if (activityImplementation == null) {
metrics.recordCount(MetricName.TYPE_NOT_FOUND.getName(), 1, MetricName.getActivityTypeDimension(activityType));
Iterable<ActivityType> typesToRegister = activityImplementationFactory.getActivityTypesToRegister();
StringBuilder types = new StringBuilder();
types.append("[");
for (ActivityType t : typesToRegister) {
if (types.length() > 1) {
types.append(", ");
}
types.append(t);
}
types.append("]");
throw new ActivityFailureException("Activity type \"" + activityType
+ "\" is not supported by the ActivityWorker. "
+ "Possible cause is activity type version change without changing task list name. "
+ "Activity types registered with the worker are: " + types);
}
ThreadLocalMetrics.setCurrent(childMetrics);
executionOptions = activityImplementation.getExecutionOptions();
output = childMetrics.recordCallable(() -> activityImplementation.execute(context), activityType.getName(),
TimeUnit.MILLISECONDS);
} catch (CancellationException e) {
respondActivityTaskCanceledWithRetry(task.getTaskToken(), null, executionOptions);
activityResultSubmitted = true;
return;
} catch (ActivityFailureException e) {
if (log.isErrorEnabled()) {
log.error("Failure processing activity task with taskId=" + task.getStartedEventId() + ", workflowGenerationId="
+ task.getWorkflowExecution().getWorkflowId() + ", activity=" + activityType + ", activityInstanceId="
+ task.getActivityId(), e);
}
respondActivityTaskFailedWithRetry(task.getTaskToken(), e.getReason(), e.getDetails(), executionOptions);
activityResultSubmitted = true;
return;
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Failure processing activity task with taskId=" + task.getStartedEventId() + ", workflowGenerationId="
+ task.getWorkflowExecution().getWorkflowId() + ", activity=" + activityType + ", activityInstanceId="
+ task.getActivityId(), e);
}
String reason = e.getMessage();
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
String details = sw.toString();
if (details.length() > FlowValueConstraint.FAILURE_DETAILS.getMaxSize()) {
metrics.recordCount(MetricName.RESPONSE_TRUNCATED.getName(), 1, MetricName.getActivityTypeDimension(activityType));
log.warn("Length of details is over maximum input length of 32768. Actual details: " + details +
"when processing activity task with taskId=" + task.getStartedEventId() + ", workflowId="
+ task.getWorkflowExecution().getWorkflowId() + ", activity=" + activityType + ", activityInstanceId="
+ task.getActivityId());
details = WorkflowExecutionUtils.truncateDetails(details);
}
respondActivityTaskFailedWithRetry(task.getTaskToken(), reason, details, executionOptions);
activityResultSubmitted = true;
return;
} finally {
childMetrics.close();
ThreadLocalMetrics.setCurrent(metrics);
}
if (executionOptions == null || !executionOptions.isManualActivityCompletion()) {
respondActivityTaskCompletedWithRetry(task.getTaskToken(), output, executionOptions);
activityResultSubmitted = true;
}
} finally {
metrics.recordCount(MetricName.DROPPED_TASK.getName(), !activityResultSubmitted, MetricName.getActivityTypeDimension(activityType));
metrics.close();
ThreadLocalMetrics.clearCurrent();
}
}