in src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java [248:342]
private void sendReply(
IWorkflowService service,
PollForDecisionTaskResponse task,
DecisionTaskHandler.Result response)
throws TException {
RespondDecisionTaskCompletedRequest taskCompleted = response.getTaskCompleted();
if (taskCompleted != null) {
taskCompleted.setIdentity(options.getIdentity());
taskCompleted.setTaskToken(task.getTaskToken());
taskCompleted.setBinaryChecksum(BinaryChecksum.getBinaryChecksum());
RpcRetryer.retry(
() -> {
RespondDecisionTaskCompletedResponse taskCompletedResponse = null;
List<Task> activityTasks = new ArrayList<>();
try {
if (ldaTaskPoller != null) {
for (Decision decision : taskCompleted.getDecisions()) {
ScheduleActivityTaskDecisionAttributes attr =
decision.getScheduleActivityTaskDecisionAttributes();
if (attr != null && taskList.equals(attr.getTaskList().getName())) {
// assume the activity type is in registry otherwise the activity would be
// failed and retried from server
Task activityTask =
new Task(
attr.getActivityId(),
attr.getActivityType(),
attr.bufferForInput(),
attr.getScheduleToCloseTimeoutSeconds(),
attr.getStartToCloseTimeoutSeconds(),
attr.getHeartbeatTimeoutSeconds(),
task.getWorkflowType(),
domain,
attr.getHeader(),
task.getWorkflowExecution());
if (ldaTaskPoller.apply(activityTask)) {
options
.getMetricsScope()
.counter(MetricsType.ACTIVITY_LOCAL_DISPATCH_SUCCEED_COUNTER)
.inc(1);
decision
.getScheduleActivityTaskDecisionAttributes()
.setRequestLocalDispatch(true);
activityTasks.add(activityTask);
} else {
// all pollers are busy - no room to optimize
options
.getMetricsScope()
.counter(MetricsType.ACTIVITY_LOCAL_DISPATCH_FAILED_COUNTER)
.inc(1);
}
}
}
}
taskCompletedResponse = service.RespondDecisionTaskCompleted(taskCompleted);
} finally {
for (Task activityTask : activityTasks) {
boolean started = false;
if (taskCompletedResponse != null
&& taskCompletedResponse.getActivitiesToDispatchLocally() != null) {
ActivityLocalDispatchInfo activityLocalDispatchInfo =
taskCompletedResponse
.getActivitiesToDispatchLocally()
.getOrDefault(activityTask.activityId, null);
if (activityLocalDispatchInfo != null) {
activityTask.scheduledTimestamp =
activityLocalDispatchInfo.getScheduledTimestamp();
activityTask.startedTimestamp =
activityLocalDispatchInfo.getStartedTimestamp();
activityTask.scheduledTimestampOfThisAttempt =
activityLocalDispatchInfo.getScheduledTimestampOfThisAttempt();
activityTask.taskToken = activityLocalDispatchInfo.bufferForTaskToken();
started = true;
}
}
activityTask.notify(started);
}
}
});
} else {
RespondDecisionTaskFailedRequest taskFailed = response.getTaskFailed();
if (taskFailed != null) {
taskFailed.setIdentity(options.getIdentity());
taskFailed.setTaskToken(task.getTaskToken());
taskFailed.setBinaryChecksum(BinaryChecksum.getBinaryChecksum());
RpcRetryer.retry(() -> service.RespondDecisionTaskFailed(taskFailed));
} else {
RespondQueryTaskCompletedRequest queryCompleted = response.getQueryCompleted();
if (queryCompleted != null) {
queryCompleted.setTaskToken(task.getTaskToken());
// Do not retry query response.
service.RespondQueryTaskCompleted(queryCompleted);
}
}
}
}