in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/ActivityTaskPoller.java [169:268]
private void executeWithHeartbeat(PollForActivityTaskResponse task, Workflow workflow, WorkflowStep step) {
try (MetricRecorder metrics = metricsFactory.newMetricRecorder(this.getClass().getSimpleName()
+ ".executeWithHeartbeat")) {
String activityExecutionTimeMetricName = formatActivityExecutionTimeMetricName(task.activityType().name());
metrics.startDuration(activityExecutionTimeMetricName);
ActivityExecutor executor = new ActivityExecutor(identity, task, workflow, step, metrics, metricsFactory);
Thread activityThread = new Thread(executor);
activityThread.start();
boolean canceled = false;
try {
while (true) {
activityThread.join(HEARTBEAT_INTERVAL.toMillis());
if (!activityThread.isAlive()) {
log.debug("The activity thread has ended successfully.");
break;
}
try {
RecordActivityTaskHeartbeatRequest request
= RecordActivityTaskHeartbeatRequest.builder().taskToken(task.taskToken()).build();
log.debug("Sending heartbeat for workflow id {} at step {} with activity id {}.",
task.workflowExecution().workflowId(), TaskNaming.activityName(workflow, step),
task.activityId());
RecordActivityTaskHeartbeatResponse status = swf.recordActivityTaskHeartbeat(request);
log.debug("Recorded heartbeat for workflow id {} at step {} with activity id {}.",
task.workflowExecution().workflowId(), TaskNaming.activityName(workflow, step),
task.activityId());
if (status.cancelRequested()) {
log.warn("The heartbeat told us that we received a cancellation request for this task.");
canceled = true;
}
} catch (UnknownResourceException e) {
// If the resource (e.g. the activity task) no longer exists,
// then there's no reason to continue it, since we can't report success or failure.
// If this happens, usually it means the task timed out in SWF
// before we managed to send a heartbeat.
log.warn("The heartbeat told us that the resource no longer exists, so we'll cancel this task."
+ " {}", e.getMessage());
canceled = true;
} catch (Exception e) {
log.warn("Got an error while trying to record a heartbeat.", e);
// If we weren't able to heartbeat, we don't want to fail.
// It may have been transient and we'll make another attempt after HEARTBEAT_INTERVAL has passed.
// If it fails enough for the activity to time out, oh well...
}
if (canceled) {
// If a cancel was requested for the activity, we kill the task and bail out.
activityThread.interrupt();
activityThread.join(); // we interrupted it, so we'll wait for it to end
}
}
} catch (InterruptedException e) {
metrics.addCount(formatActivityThreadInterruptedMetricName(task.activityType().name()), 1.0);
// We don't really know what happened in this case, we'll use e.getCause() if it's not null, otherwise e.
Throwable cause = (e.getCause() == null ? e : e.getCause());
String msg = "Error executing activity task";
log.warn(msg, cause);
throw new RuntimeException(msg, cause);
}
metrics.endDuration(activityExecutionTimeMetricName);
// at this point the executor should be done.
if (canceled) {
metrics.addCount(formatActivityTaskCancelledMetricName(task.activityType().name()), 1.0);
// if we were cancelled, report that we successfully cancelled the step.
swf.respondActivityTaskCanceled(RespondActivityTaskCanceledRequest.builder().taskToken(task.taskToken()).build());
} else {
StepResult result = executor.getResult();
switch (result.getAction()) {
case COMPLETE:
RespondActivityTaskCompletedRequest rac = RespondActivityTaskCompletedRequest.builder()
.taskToken(task.taskToken()).result(executor.getOutput()).build();
RetryUtils.executeWithInlineBackoff(() -> swf.respondActivityTaskCompleted(rac),
20, Duration.ofSeconds(2), metrics,
RESPOND_ACTIVITY_TASK_COMPLETED_METRIC_PREFIX);
break;
case RETRY:
// SWF doesn't model retries, so we model it as a failure here.
// The output from the executor is included here,
// as it may contain the stack trace for the exception that caused the retry.
RespondActivityTaskFailedRequest raf = RespondActivityTaskFailedRequest.builder()
.taskToken(task.taskToken()).reason(prepareRetryReason(result.getMessage()))
.details(prepareRetryDetails(executor.getOutput())).build();
RetryUtils.executeWithInlineBackoff(() -> swf.respondActivityTaskFailed(raf),
20, Duration.ofSeconds(2), metrics,
RESPOND_ACTIVITY_TASK_FAILED_METRIC_PREFIX);
break;
default:
throw new RuntimeException("Unknown result action: " + result.getAction());
}
}
} catch (Exception e) {
log.warn("Got an exception executing the activity", e);
}
}