in src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java [174:260]
private static HistoryEvent getInstanceCloseEvent(
IWorkflowService service,
String domain,
WorkflowExecution workflowExecution,
long timeout,
TimeUnit unit)
throws TimeoutException, EntityNotExistsError {
byte[] pageToken = null;
GetWorkflowExecutionHistoryResponse response;
// TODO: Interrupt service long poll call on timeout and on interrupt
long start = System.currentTimeMillis();
HistoryEvent event;
do {
if (timeout != 0 && System.currentTimeMillis() - start > unit.toMillis(timeout)) {
throw new TimeoutException(
"WorkflowId="
+ workflowExecution.getWorkflowId()
+ ", runId="
+ workflowExecution.getRunId()
+ ", timeout="
+ timeout
+ ", unit="
+ unit);
}
GetWorkflowExecutionHistoryRequest r = new GetWorkflowExecutionHistoryRequest();
r.setDomain(domain);
r.setExecution(workflowExecution);
r.setHistoryEventFilterType(HistoryEventFilterType.CLOSE_EVENT);
r.setNextPageToken(pageToken);
r.setWaitForNewEvent(true);
r.setSkipArchival(true);
RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit);
try {
response =
RpcRetryer.retryWithResult(
retryOptions,
() -> service.GetWorkflowExecutionHistoryWithTimeout(r, unit.toMillis(timeout)));
} catch (EntityNotExistsError e) {
if (e.activeCluster != null
&& e.currentCluster != null
&& !e.activeCluster.equals(e.currentCluster)) {
// Current cluster is passive cluster. Execution might not exist because of replication
// lag. If we are still within timeout, wait for a little bit and retry.
if (timeout != 0
&& System.currentTimeMillis() + ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS - start
> unit.toMillis(timeout)) {
throw e;
}
try {
Thread.sleep(ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS);
} catch (InterruptedException ie) {
// Throw entity not exist here.
throw e;
}
continue;
}
throw e;
} catch (TException e) {
throw CheckedExceptionWrapper.wrap(e);
}
pageToken = response.getNextPageToken();
History history = response.getHistory();
if (history != null && history.getEvents().size() > 0) {
event = history.getEvents().get(0);
if (!isWorkflowExecutionCompletedEvent(event)) {
throw new RuntimeException("Last history event is not completion event: " + event);
}
// Workflow called continueAsNew. Start polling the new generation with new runId.
if (event.getEventType() == EventType.WorkflowExecutionContinuedAsNew) {
pageToken = null;
workflowExecution =
new WorkflowExecution()
.setWorkflowId(workflowExecution.getWorkflowId())
.setRunId(
event
.getWorkflowExecutionContinuedAsNewEventAttributes()
.getNewExecutionRunId());
continue;
}
break;
}
} while (true);
return event;
}