in src/main/java/com/uber/cadence/internal/testservice/StateMachines.java [684:760]
private static void scheduleActivityTask(
RequestContext ctx,
ActivityTaskData data,
ScheduleActivityTaskDecisionAttributes d,
long decisionTaskCompletedEventId)
throws BadRequestError {
int scheduleToCloseTimeoutSeconds = d.getScheduleToCloseTimeoutSeconds();
int scheduleToStartTimeoutSeconds = d.getScheduleToStartTimeoutSeconds();
RetryState retryState;
RetryPolicy retryPolicy = d.getRetryPolicy();
if (retryPolicy != null) {
long expirationInterval =
TimeUnit.SECONDS.toMillis(retryPolicy.getExpirationIntervalInSeconds());
long expirationTime = data.store.currentTimeMillis() + expirationInterval;
retryState = new RetryState(retryPolicy, expirationTime);
// Override activity timeouts to allow retry policy to run up to its expiration.
int overriddenTimeout;
if (retryPolicy.getExpirationIntervalInSeconds() > 0) {
overriddenTimeout = retryPolicy.getExpirationIntervalInSeconds();
} else {
overriddenTimeout =
data.startWorkflowExecutionRequest.getExecutionStartToCloseTimeoutSeconds();
}
scheduleToCloseTimeoutSeconds = overriddenTimeout;
scheduleToStartTimeoutSeconds = overriddenTimeout;
} else {
retryState = null;
}
ActivityTaskScheduledEventAttributes a =
new ActivityTaskScheduledEventAttributes()
.setInput(d.getInput())
.setActivityId(d.getActivityId())
.setActivityType(d.getActivityType())
.setDomain(d.getDomain() == null ? ctx.getDomain() : d.getDomain())
.setHeartbeatTimeoutSeconds(d.getHeartbeatTimeoutSeconds())
.setScheduleToCloseTimeoutSeconds(scheduleToCloseTimeoutSeconds)
.setScheduleToStartTimeoutSeconds(scheduleToStartTimeoutSeconds)
.setStartToCloseTimeoutSeconds(d.getStartToCloseTimeoutSeconds())
.setTaskList(d.getTaskList())
.setRetryPolicy(retryPolicy)
.setHeader(d.getHeader())
.setDecisionTaskCompletedEventId(decisionTaskCompletedEventId);
data.scheduledEvent =
a; // Cannot set it in onCommit as it is used in the processScheduleActivityTask
HistoryEvent event =
new HistoryEvent()
.setEventType(EventType.ActivityTaskScheduled)
.setActivityTaskScheduledEventAttributes(a);
long scheduledEventId = ctx.addEvent(event);
PollForActivityTaskResponse taskResponse =
new PollForActivityTaskResponse()
.setWorkflowDomain(ctx.getDomain())
.setWorkflowType(data.startWorkflowExecutionRequest.workflowType)
.setActivityType(d.getActivityType())
.setWorkflowExecution(ctx.getExecution())
.setActivityId(d.getActivityId())
.setInput(d.getInput())
.setHeartbeatTimeoutSeconds(d.getHeartbeatTimeoutSeconds())
.setScheduleToCloseTimeoutSeconds(scheduleToCloseTimeoutSeconds)
.setStartToCloseTimeoutSeconds(d.getStartToCloseTimeoutSeconds())
.setScheduledTimestamp(ctx.currentTimeInNanoseconds())
.setScheduledTimestampOfThisAttempt(ctx.currentTimeInNanoseconds())
.setHeader(d.getHeader())
.setAttempt(0);
TaskListId taskListId = new TaskListId(ctx.getDomain(), d.getTaskList().getName());
ActivityTask activityTask = new ActivityTask(taskListId, taskResponse);
ctx.addActivityTask(activityTask);
ctx.onCommit(
(historySize) -> {
data.scheduledEventId = scheduledEventId;
data.activityTask = activityTask;
data.retryState = retryState;
});
}