private ConnectDownloadResponse connectImpl()

in src/main/java/com/uber/rss/clients/DataBlockSocketReadClient.java [108:167]


  private ConnectDownloadResponse connectImpl() {
    if (socket != null) {
      throw new RssInvalidStateException(String.format("Already connected to server, cannot connect again: %s", connectionInfo));
    }

    logger.debug("Connecting to server: {}", connectionInfo);

    connectSocket();

    write(MessageConstants.DOWNLOAD_UPLINK_MAGIC_BYTE);
    write(MessageConstants.DOWNLOAD_UPLINK_VERSION_3);

    ConnectDownloadRequest connectRequest = new ConnectDownloadRequest(user, appShufflePartitionId, fetchTaskAttemptIds);


    ExceptionWrapper<RssException> exceptionWrapper = new ExceptionWrapper<>();

    Boolean succeeded = RetryUtils.retryUntilNotNull(dataAvailablePollInterval, dataAvailablePollInterval * POLL_INTERVAL_MAX_MULTIPLIER, dataAvailableWaitTime, () -> {
      try {
        writeControlMessageAndWaitResponseStatus(connectRequest);
        return Boolean.TRUE;
      } catch (RssShuffleCorruptedException ex) {
        throw new RssShuffleCorruptedException("Shuffle data corrupted for: " + appShufflePartitionId, ex);
      } catch (RssMissingShuffleWriteConfigException | RssShuffleStageNotStartedException ex) {
        exceptionWrapper.setException(ex);
        logger.warn(String.format("Did not find data in server side, server may not run fast enough to get data from client or server hits some issue, %s", appShufflePartitionId), ex);
        return null;
      }
    });

    if (succeeded == null || !succeeded.booleanValue()) {
      if (exceptionWrapper.getException() != null) {
        throw exceptionWrapper.getException();
      } else {
        throw new RssInvalidStateException(String.format("Failed to connect to server %s, %s", connectionInfo, appShufflePartitionId));
      }
    }

    ConnectDownloadResponse connectDownloadResponse = readResponseMessage(MessageConstants.MESSAGE_ConnectDownloadResponse, ConnectDownloadResponse::deserialize);

    logger.info("Connected to server: {}, response: {}", connectionInfo, connectDownloadResponse);

    fileCompressionCodec = connectDownloadResponse.getCompressionCodec();

    if (connectDownloadResponse.isDataAvailable()) {
      this.commitMapTaskCommitStatus = connectDownloadResponse.getMapTaskCommitStatus();
      if (this.commitMapTaskCommitStatus == null) {
        throw new RssInvalidDataException("MapTaskCommitStatus should not be null");
      }
      this.commitTaskAttemptIds = new HashSet<>(this.commitMapTaskCommitStatus.getTaskAttemptIds().values());
      if (!this.commitTaskAttemptIds.containsAll(fetchTaskAttemptIds)) {
        throw new RssInvalidDataException(String.format(
            "Task attempt ids not matched, committed: %s, fetching: %s",
            this.commitTaskAttemptIds,
            fetchTaskAttemptIds));
      }
    }

    return connectDownloadResponse;
  }