private static HistoryEvent getInstanceCloseEvent()

in src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java [174:260]


  private static HistoryEvent getInstanceCloseEvent(
      IWorkflowService service,
      String domain,
      WorkflowExecution workflowExecution,
      long timeout,
      TimeUnit unit)
      throws TimeoutException, EntityNotExistsError {
    byte[] pageToken = null;
    GetWorkflowExecutionHistoryResponse response;
    // TODO: Interrupt service long poll call on timeout and on interrupt
    long start = System.currentTimeMillis();
    HistoryEvent event;
    do {
      if (timeout != 0 && System.currentTimeMillis() - start > unit.toMillis(timeout)) {
        throw new TimeoutException(
            "WorkflowId="
                + workflowExecution.getWorkflowId()
                + ", runId="
                + workflowExecution.getRunId()
                + ", timeout="
                + timeout
                + ", unit="
                + unit);
      }

      GetWorkflowExecutionHistoryRequest r = new GetWorkflowExecutionHistoryRequest();
      r.setDomain(domain);
      r.setExecution(workflowExecution);
      r.setHistoryEventFilterType(HistoryEventFilterType.CLOSE_EVENT);
      r.setNextPageToken(pageToken);
      r.setWaitForNewEvent(true);
      r.setSkipArchival(true);
      RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit);
      try {
        response =
            RpcRetryer.retryWithResult(
                retryOptions,
                () -> service.GetWorkflowExecutionHistoryWithTimeout(r, unit.toMillis(timeout)));
      } catch (EntityNotExistsError e) {
        if (e.activeCluster != null
            && e.currentCluster != null
            && !e.activeCluster.equals(e.currentCluster)) {
          // Current cluster is passive cluster. Execution might not exist because of replication
          // lag. If we are still within timeout, wait for a little bit and retry.
          if (timeout != 0
              && System.currentTimeMillis() + ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS - start
                  > unit.toMillis(timeout)) {
            throw e;
          }

          try {
            Thread.sleep(ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS);
          } catch (InterruptedException ie) {
            // Throw entity not exist here.
            throw e;
          }
          continue;
        }
        throw e;
      } catch (TException e) {
        throw CheckedExceptionWrapper.wrap(e);
      }

      pageToken = response.getNextPageToken();
      History history = response.getHistory();
      if (history != null && history.getEvents().size() > 0) {
        event = history.getEvents().get(0);
        if (!isWorkflowExecutionCompletedEvent(event)) {
          throw new RuntimeException("Last history event is not completion event: " + event);
        }
        // Workflow called continueAsNew. Start polling the new generation with new runId.
        if (event.getEventType() == EventType.WorkflowExecutionContinuedAsNew) {
          pageToken = null;
          workflowExecution =
              new WorkflowExecution()
                  .setWorkflowId(workflowExecution.getWorkflowId())
                  .setRunId(
                      event
                          .getWorkflowExecutionContinuedAsNewEventAttributes()
                          .getNewExecutionRunId());
          continue;
        }
        break;
      }
    } while (true);
    return event;
  }