private void executeWithHeartbeat()

in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/ActivityTaskPoller.java [169:268]


    private void executeWithHeartbeat(PollForActivityTaskResponse task, Workflow workflow, WorkflowStep step) {
        try (MetricRecorder metrics = metricsFactory.newMetricRecorder(this.getClass().getSimpleName()
                                                                       + ".executeWithHeartbeat")) {
            String activityExecutionTimeMetricName = formatActivityExecutionTimeMetricName(task.activityType().name());
            metrics.startDuration(activityExecutionTimeMetricName);
            ActivityExecutor executor = new ActivityExecutor(identity, task, workflow, step, metrics, metricsFactory);

            Thread activityThread = new Thread(executor);
            activityThread.start();

            boolean canceled = false;

            try {
                while (true) {
                    activityThread.join(HEARTBEAT_INTERVAL.toMillis());
                    if (!activityThread.isAlive()) {
                        log.debug("The activity thread has ended successfully.");
                        break;
                    }

                    try {
                        RecordActivityTaskHeartbeatRequest request
                                = RecordActivityTaskHeartbeatRequest.builder().taskToken(task.taskToken()).build();
                        log.debug("Sending heartbeat for workflow id {} at step {} with activity id {}.",
                                  task.workflowExecution().workflowId(), TaskNaming.activityName(workflow, step),
                                  task.activityId());
                        RecordActivityTaskHeartbeatResponse status = swf.recordActivityTaskHeartbeat(request);
                        log.debug("Recorded heartbeat for workflow id {} at step {} with activity id {}.",
                                  task.workflowExecution().workflowId(), TaskNaming.activityName(workflow, step),
                                  task.activityId());
                        if (status.cancelRequested()) {
                            log.warn("The heartbeat told us that we received a cancellation request for this task.");
                            canceled = true;
                        }
                    } catch (UnknownResourceException e) {
                        // If the resource (e.g. the activity task) no longer exists,
                        // then there's no reason to continue it, since we can't report success or failure.
                        // If this happens, usually it means the task timed out in SWF
                        // before we managed to send a heartbeat.
                        log.warn("The heartbeat told us that the resource no longer exists, so we'll cancel this task."
                                 + " {}", e.getMessage());
                        canceled = true;
                    } catch (Exception e) {
                        log.warn("Got an error while trying to record a heartbeat.", e);
                        // If we weren't able to heartbeat, we don't want to fail.
                        // It may have been transient and we'll make another attempt after HEARTBEAT_INTERVAL has passed.
                        // If it fails enough for the activity to time out, oh well...
                    }
                    if (canceled) {
                        // If a cancel was requested for the activity, we kill the task and bail out.
                        activityThread.interrupt();
                        activityThread.join(); // we interrupted it, so we'll wait for it to end
                    }
                }
            } catch (InterruptedException e) {
                metrics.addCount(formatActivityThreadInterruptedMetricName(task.activityType().name()), 1.0);
                // We don't really know what happened in this case, we'll use e.getCause() if it's not null, otherwise e.
                Throwable cause = (e.getCause() == null ? e : e.getCause());
                String msg = "Error executing activity task";
                log.warn(msg, cause);
                throw new RuntimeException(msg, cause);
            }

            metrics.endDuration(activityExecutionTimeMetricName);

            // at this point the executor should be done.
            if (canceled) {
                metrics.addCount(formatActivityTaskCancelledMetricName(task.activityType().name()), 1.0);
                // if we were cancelled, report that we successfully cancelled the step.
                swf.respondActivityTaskCanceled(RespondActivityTaskCanceledRequest.builder().taskToken(task.taskToken()).build());
            } else {
                StepResult result = executor.getResult();

                switch (result.getAction()) {
                    case COMPLETE:
                        RespondActivityTaskCompletedRequest rac = RespondActivityTaskCompletedRequest.builder()
                                .taskToken(task.taskToken()).result(executor.getOutput()).build();
                        RetryUtils.executeWithInlineBackoff(() -> swf.respondActivityTaskCompleted(rac),
                                                            20, Duration.ofSeconds(2), metrics,
                                                            RESPOND_ACTIVITY_TASK_COMPLETED_METRIC_PREFIX);
                        break;
                    case RETRY:
                        // SWF doesn't model retries, so we model it as a failure here.
                        // The output from the executor is included here,
                        // as it may contain the stack trace for the exception that caused the retry.
                        RespondActivityTaskFailedRequest raf = RespondActivityTaskFailedRequest.builder()
                                .taskToken(task.taskToken()).reason(prepareRetryReason(result.getMessage()))
                                .details(prepareRetryDetails(executor.getOutput())).build();
                        RetryUtils.executeWithInlineBackoff(() -> swf.respondActivityTaskFailed(raf),
                                                            20, Duration.ofSeconds(2), metrics,
                                                            RESPOND_ACTIVITY_TASK_FAILED_METRIC_PREFIX);
                        break;
                    default:
                        throw new RuntimeException("Unknown result action: " + result.getAction());
                }
            }
        } catch (Exception e) {
            log.warn("Got an exception executing the activity", e);
        }
    }