in src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java [272:333]
private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
IWorkflowService service,
String domain,
final WorkflowExecution workflowExecution,
byte[] pageToken,
long timeout,
TimeUnit unit) {
// TODO: Interrupt service long poll call on timeout and on interrupt
long start = System.currentTimeMillis();
GetWorkflowExecutionHistoryRequest request = new GetWorkflowExecutionHistoryRequest();
request.setDomain(domain);
request.setExecution(workflowExecution);
request.setHistoryEventFilterType(HistoryEventFilterType.CLOSE_EVENT);
request.setWaitForNewEvent(true);
request.setNextPageToken(pageToken);
CompletableFuture<GetWorkflowExecutionHistoryResponse> response =
getWorkflowExecutionHistoryAsync(service, request, timeout, unit);
return response.thenComposeAsync(
(r) -> {
long elapsedTime = System.currentTimeMillis() - start;
if (timeout != 0 && elapsedTime > unit.toMillis(timeout)) {
throw CheckedExceptionWrapper.wrap(
new TimeoutException(
"WorkflowId="
+ workflowExecution.getWorkflowId()
+ ", runId="
+ workflowExecution.getRunId()
+ ", timeout="
+ timeout
+ ", unit="
+ unit));
}
History history = r.getHistory();
if (history == null || history.getEvents().size() == 0) {
// Empty poll returned
return getInstanceCloseEventAsync(
service, domain, workflowExecution, pageToken, timeout - elapsedTime, unit);
}
HistoryEvent event = history.getEvents().get(0);
if (!isWorkflowExecutionCompletedEvent(event)) {
throw new RuntimeException("Last history event is not completion event: " + event);
}
// Workflow called continueAsNew. Start polling the new generation with new runId.
if (event.getEventType() == EventType.WorkflowExecutionContinuedAsNew) {
WorkflowExecution nextWorkflowExecution =
new WorkflowExecution()
.setWorkflowId(workflowExecution.getWorkflowId())
.setRunId(
event
.getWorkflowExecutionContinuedAsNewEventAttributes()
.getNewExecutionRunId());
return getInstanceCloseEventAsync(
service,
domain,
nextWorkflowExecution,
r.getNextPageToken(),
timeout - elapsedTime,
unit);
}
return CompletableFuture.completedFuture(event);
});
}