private static CompletableFuture getInstanceCloseEventAsync()

in src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java [272:333]


  private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
      IWorkflowService service,
      String domain,
      final WorkflowExecution workflowExecution,
      byte[] pageToken,
      long timeout,
      TimeUnit unit) {
    // TODO: Interrupt service long poll call on timeout and on interrupt
    long start = System.currentTimeMillis();
    GetWorkflowExecutionHistoryRequest request = new GetWorkflowExecutionHistoryRequest();
    request.setDomain(domain);
    request.setExecution(workflowExecution);
    request.setHistoryEventFilterType(HistoryEventFilterType.CLOSE_EVENT);
    request.setWaitForNewEvent(true);
    request.setNextPageToken(pageToken);
    CompletableFuture<GetWorkflowExecutionHistoryResponse> response =
        getWorkflowExecutionHistoryAsync(service, request, timeout, unit);
    return response.thenComposeAsync(
        (r) -> {
          long elapsedTime = System.currentTimeMillis() - start;
          if (timeout != 0 && elapsedTime > unit.toMillis(timeout)) {
            throw CheckedExceptionWrapper.wrap(
                new TimeoutException(
                    "WorkflowId="
                        + workflowExecution.getWorkflowId()
                        + ", runId="
                        + workflowExecution.getRunId()
                        + ", timeout="
                        + timeout
                        + ", unit="
                        + unit));
          }
          History history = r.getHistory();
          if (history == null || history.getEvents().size() == 0) {
            // Empty poll returned
            return getInstanceCloseEventAsync(
                service, domain, workflowExecution, pageToken, timeout - elapsedTime, unit);
          }
          HistoryEvent event = history.getEvents().get(0);
          if (!isWorkflowExecutionCompletedEvent(event)) {
            throw new RuntimeException("Last history event is not completion event: " + event);
          }
          // Workflow called continueAsNew. Start polling the new generation with new runId.
          if (event.getEventType() == EventType.WorkflowExecutionContinuedAsNew) {
            WorkflowExecution nextWorkflowExecution =
                new WorkflowExecution()
                    .setWorkflowId(workflowExecution.getWorkflowId())
                    .setRunId(
                        event
                            .getWorkflowExecutionContinuedAsNewEventAttributes()
                            .getNewExecutionRunId());
            return getInstanceCloseEventAsync(
                service,
                domain,
                nextWorkflowExecution,
                r.getNextPageToken(),
                timeout - elapsedTime,
                unit);
          }
          return CompletableFuture.completedFuture(event);
        });
  }