src/main/java/com/uber/rss/clients/RecordSocketReadClient.java [44:111]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        this.dataBlockSocketReadClient = new DataBlockSocketReadClient(host, port, timeoutMillis, user, appShufflePartitionId, fetchTaskAttemptIds, dataAvailablePollInterval, dataAvailableWaitTime);
        this.metrics = new ReadClientMetrics(new ReadClientMetricsKey(this.getClass().getSimpleName(), user));
    }

    @Override
    public DownloadServerVerboseInfo connect() {
        try {
            ConnectDownloadResponse connectDownloadResponse = dataBlockSocketReadClient.connect();
            DownloadServerVerboseInfo downloadServerVerboseInfo = new DownloadServerVerboseInfo();
            downloadServerVerboseInfo.setId(connectDownloadResponse.getServerId());
            downloadServerVerboseInfo.setRunningVersion(connectDownloadResponse.getRunningVersion());
            downloadServerVerboseInfo.setMapTaskCommitStatus(connectDownloadResponse.getMapTaskCommitStatus());
            return downloadServerVerboseInfo;
        } catch (RuntimeException ex) {
            logger.warn(String.format("Failed to connect %s", this), ex);
            close();
            throw ex;
        }
    }

    @Override
    public void close() {
        try {
            dataBlockSocketReadClient.close();
        } catch (Throwable ex) {
            logger.warn(String.format("Failed to close %s", this), ex);
        }

        closeMetrics();
    }

    @Override
    public TaskDataBlock readDataBlock() {
        try {
            DataBlock dataBlock = dataBlockSocketReadClient.readDataBlock();
            if (dataBlock == null) {
                return null;
            }
            shuffleReadBytes += DataBlockHeader.NUM_BYTES + dataBlock.getPayload().length;
            return new TaskDataBlock(dataBlock.getPayload(), dataBlock.getHeader().getTaskAttemptId());
        } catch (RuntimeException ex) {
            logger.warn(String.format("Failed to read shuffle data %s", this), ex);
            close();
            throw ex;
        }
    }

    @Override
    public long getShuffleReadBytes() {
        return shuffleReadBytes;
    }

    @Override
    public String toString() {
        return "RecordSocketReadClient{" +
                "dataBlockSocketReadClient=" + dataBlockSocketReadClient +
                '}';
    }

    private void closeMetrics() {
        try {
            if (metrics != null) {
                metrics.close();
                metrics = null;
            }
        } catch (Throwable e) {
            M3Stats.addException(e, this.getClass().getSimpleName());
            logger.warn(String.format("Failed to close metrics: %s", this), e);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/main/java/com/uber/rss/clients/ShuffleDataSocketReadClient.java [44:111]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    this.dataBlockSocketReadClient = new DataBlockSocketReadClient(host, port, timeoutMillis, user, appShufflePartitionId, fetchTaskAttemptIds, dataAvailablePollInterval, dataAvailableWaitTime);
    this.metrics = new ReadClientMetrics(new ReadClientMetricsKey(this.getClass().getSimpleName(), user));
  }

  @Override
  public DownloadServerVerboseInfo connect() {
    try {
      ConnectDownloadResponse connectDownloadResponse = dataBlockSocketReadClient.connect();
      DownloadServerVerboseInfo downloadServerVerboseInfo = new DownloadServerVerboseInfo();
      downloadServerVerboseInfo.setId(connectDownloadResponse.getServerId());
      downloadServerVerboseInfo.setRunningVersion(connectDownloadResponse.getRunningVersion());
      downloadServerVerboseInfo.setMapTaskCommitStatus(connectDownloadResponse.getMapTaskCommitStatus());
      return downloadServerVerboseInfo;
    } catch (RuntimeException ex) {
      logger.warn(String.format("Failed to connect %s", this), ex);
      close();
      throw ex;
    }
  }

  @Override
  public void close() {
    try {
      dataBlockSocketReadClient.close();
    } catch (Throwable ex) {
      logger.warn(String.format("Failed to close %s", this), ex);
    }

    closeMetrics();
  }

  @Override
  public TaskDataBlock readDataBlock() {
    try {
      DataBlock dataBlock = dataBlockSocketReadClient.readDataBlock();
      if (dataBlock == null) {
        return null;
      }
      shuffleReadBytes += DataBlockHeader.NUM_BYTES + dataBlock.getPayload().length;
      return new TaskDataBlock(dataBlock.getPayload(), dataBlock.getHeader().getTaskAttemptId());
    } catch (RuntimeException ex) {
      logger.warn(String.format("Failed to read shuffle data %s", this), ex);
      close();
      throw ex;
    }
  }

  @Override
  public long getShuffleReadBytes() {
    return shuffleReadBytes;
  }

  @Override
  public String toString() {
    return "RecordSocketReadClient{" +
        "dataBlockSocketReadClient=" + dataBlockSocketReadClient +
        '}';
  }

  private void closeMetrics() {
    try {
      if (metrics != null) {
        metrics.close();
        metrics = null;
      }
    } catch (Throwable e) {
      M3Stats.addException(e, this.getClass().getSimpleName());
      logger.warn(String.format("Failed to close metrics: %s", this), e);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



