in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/ActivityTaskPoller.java [121:167]
private Runnable pollForActivityTask(MetricRecorder metrics) {
try {
Duration waitTime = metrics.endDuration(WORKER_THREAD_AVAILABILITY_WAIT_TIME_METRIC_NAME);
// emit the wait time metric again, under this poller's task list name.
metrics.addDuration(WORKER_THREAD_AVAILABILITY_WAIT_TIME_METRIC_NAME + "." + taskListName, waitTime);
PollForActivityTaskRequest request = PollForActivityTaskRequest.builder()
.domain(domain).taskList(TaskList.builder().name(taskListName).build()).identity(identity).build();
PollForActivityTaskResponse task
= RetryUtils.executeWithInlineBackoff(() -> swf.pollForActivityTask(request),
20, Duration.ofSeconds(2), metrics,
ACTIVITY_TASK_POLL_TIME_METRIC_PREFIX);
if (task == null || task.taskToken() == null || task.taskToken().equals("")) {
// this means there was no work to do
metrics.addCount(NO_ACTIVITY_TASK_TO_EXECUTE_METRIC_NAME, 1.0);
return null;
}
WorkflowStep step = workflowSteps.get(task.activityType().name());
if (step == null) {
metrics.addCount(formatUnrecognizedActivityTaskMetricName(task.activityType().name()), 1.0);
String message = "Activity task received for unrecognized activity: " + task.activityType().name();
log.warn(message);
throw new UnrecognizedTaskException(message);
}
Workflow workflow = workflows.get(TaskNaming.workflowNameFromActivityName(task.activityType().name()));
if (workflow == null) {
String message = "Activity " + task.activityType().name()
+ " was a valid activity but somehow not a valid workflow, this should not be possible!";
log.error(message);
throw new IllegalStateException(message);
}
log.debug("Polled for activity task and there was work to do.");
return () -> executeWithHeartbeat(task, workflow, step);
} catch (UnrecognizedTaskException | IllegalStateException e) {
throw e;
} catch (Throwable e) {
log.warn("Got an unexpected exception when polling for an activity task.", e);
throw e;
} finally {
metrics.close();
}
}