private Runnable pollForActivityTask()

in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/ActivityTaskPoller.java [121:167]


    private Runnable pollForActivityTask(MetricRecorder metrics) {
        try {
            Duration waitTime = metrics.endDuration(WORKER_THREAD_AVAILABILITY_WAIT_TIME_METRIC_NAME);
            // emit the wait time metric again, under this poller's task list name.
            metrics.addDuration(WORKER_THREAD_AVAILABILITY_WAIT_TIME_METRIC_NAME + "." + taskListName, waitTime);

            PollForActivityTaskRequest request = PollForActivityTaskRequest.builder()
                    .domain(domain).taskList(TaskList.builder().name(taskListName).build()).identity(identity).build();

            PollForActivityTaskResponse task
                    = RetryUtils.executeWithInlineBackoff(() -> swf.pollForActivityTask(request),
                                                          20, Duration.ofSeconds(2), metrics,
                                                          ACTIVITY_TASK_POLL_TIME_METRIC_PREFIX);

            if (task == null || task.taskToken() == null || task.taskToken().equals("")) {
                // this means there was no work to do
                metrics.addCount(NO_ACTIVITY_TASK_TO_EXECUTE_METRIC_NAME, 1.0);
                return null;
            }

            WorkflowStep step = workflowSteps.get(task.activityType().name());
            if (step == null) {
                metrics.addCount(formatUnrecognizedActivityTaskMetricName(task.activityType().name()), 1.0);
                String message = "Activity task received for unrecognized activity: " + task.activityType().name();
                log.warn(message);
                throw new UnrecognizedTaskException(message);
            }

            Workflow workflow = workflows.get(TaskNaming.workflowNameFromActivityName(task.activityType().name()));
            if (workflow == null) {
                String message = "Activity " + task.activityType().name()
                                 + " was a valid activity but somehow not a valid workflow, this should not be possible!";
                log.error(message);
                throw new IllegalStateException(message);
            }

            log.debug("Polled for activity task and there was work to do.");
            return () -> executeWithHeartbeat(task, workflow, step);
        } catch (UnrecognizedTaskException | IllegalStateException e) {
            throw e;
        } catch (Throwable e) {
            log.warn("Got an unexpected exception when polling for an activity task.", e);
            throw e;
        } finally {
            metrics.close();
        }
    }