public GetDataAvailabilityResponse waitDataAvailable()

in src/main/java/com/uber/rss/clients/DataBlockSocketReadClient.java [169:233]


  public GetDataAvailabilityResponse waitDataAvailable() {
    if (this.commitMapTaskCommitStatus != null) {
      throw new RssInvalidStateException("Data already available, should not wait again");
    }

    long startTime = System.currentTimeMillis();
    logger.info("Waiting for all mappers finished: {}, {}", appShufflePartitionId, connectionInfo);


    Stopwatch reducerWaitTimeStopwatch = metrics.getReducerWaitTime().start();
    final ObjectWrapper<GetDataAvailabilityResponse> getDataAvailabilityRetryLastResult = new ObjectWrapper<>();
    try {
      RetryUtils.retryUntilNotNull(dataAvailablePollInterval, dataAvailablePollInterval * POLL_INTERVAL_MAX_MULTIPLIER, dataAvailableWaitTime, () -> {
        GetDataAvailabilityResponse getDataAvailabilityResponse = getDataAvailability();
        getDataAvailabilityRetryLastResult.setObject(getDataAvailabilityResponse);
        if (getDataAvailabilityResponse.isDataAvailable()) {
          return getDataAvailabilityResponse;
        } else {
          return null;
        }
      });
    } finally {
      reducerWaitTimeStopwatch.stop();
    }

    logger.info("Finished waiting for all mappers to finish, partition: {}, duration: {} seconds",
        appShufflePartitionId, (System.currentTimeMillis() - startTime)/1000);

    GetDataAvailabilityResponse getDataAvailabilityRetryResult = getDataAvailabilityRetryLastResult.getObject();

    // Throw exception if not get the status which indicating all mappers are finished
    if (getDataAvailabilityRetryResult == null || !getDataAvailabilityRetryResult.isDataAvailable()) {
      // get task attempt ids from GetDataAvailabilityResponse and put them into the exception to help debugging
      String taskAttemptIdInfo = "";
      if (getDataAvailabilityRetryResult != null && getDataAvailabilityRetryResult.getMapTaskCommitStatus() != null) {
        MapTaskCommitStatus mapTaskCommitStatus = getDataAvailabilityRetryResult.getMapTaskCommitStatus();
        if (mapTaskCommitStatus.getTaskAttemptIds().isEmpty()) {
          taskAttemptIdInfo = String.format("no task attempt committed");
        } else {
          List<Long> taskAttemptIds = mapTaskCommitStatus.getTaskAttemptIds().values().stream().collect(Collectors.toList());
          Collections.sort(taskAttemptIds);
          taskAttemptIdInfo = String.format("committed task ids: %s, fetching tasks: %s",
              StringUtils.toString4SortedIntList(taskAttemptIds),
              StringUtils.toString4SortedIntList(fetchTaskAttemptIds.stream().sorted().collect(Collectors.toList())));
        }
      }
      throw new RssShuffleDataNotAvailableException(String.format(
          "Not all mappers finished after trying %s:%s for %s millis, partition: %s, %s",
          host, port, dataAvailableWaitTime, appShufflePartitionId, taskAttemptIdInfo));
    }

    this.commitMapTaskCommitStatus = getDataAvailabilityRetryResult.getMapTaskCommitStatus();
    if (this.commitMapTaskCommitStatus == null) {
      throw new RssInvalidDataException("MapTaskCommitStatus should not be null");
    }
    this.commitTaskAttemptIds = new HashSet<>(this.commitMapTaskCommitStatus.getTaskAttemptIds().values());
    if (!this.commitTaskAttemptIds.containsAll(fetchTaskAttemptIds)) {
      throw new RssInvalidDataException(String.format(
          "Task attempt ids not matched, committed: %s, fetching: %s",
          this.commitTaskAttemptIds,
          fetchTaskAttemptIds));
    }

    return getDataAvailabilityRetryResult;
  }