public void run()

in client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java [476:562]


    public void run() {
      while (!stop) {
        try {
          RssGetSortedShuffleDataRequest request =
              new RssGetSortedShuffleDataRequest(
                  appId, shuffleId, partitionId, blockId, retryMax, retryIntervalMax);
          RssGetSortedShuffleDataResponse response = client.getSortedShuffleData(request);
          if (response.getStatusCode() != StatusCode.SUCCESS
              || response.getMergeState() == MergeState.INTERNAL_ERROR.code()) {
            fetchError = response.getMessage();
            nextShuffleServerInfo();
            break;
          } else if (response.getMergeState() == MergeState.INITED.code()) {
            fetchError = "Remote merge should be started!";
            nextShuffleServerInfo();
            break;
          }
          if (response.getMergeState() == MergeState.MERGING.code()
              && response.getNextBlockId() == -1) {
            // All merged data has been read, but there may be data that has not yet been merged. So
            // wait done!
            LOG.info("RMRecordsFetcher will sleep {} ms", sleepTime);
            Thread.sleep(this.sleepTime);
            this.sleepTime = Math.min(this.sleepTime * 2, maxFetchSleepTime);
          } else if (response.getMergeState() == MergeState.DONE.code()
              && response.getNextBlockId() == -1) {
            // All data has been read. Send the last records.
            if (recordBuffer.size() > 0) {
              nextQueue.put(recordBuffer);
            }
            nextQueue.setProducerDone(true);
            break;
          } else if (response.getMergeState() == MergeState.DONE.code()
              || response.getMergeState() == MergeState.MERGING.code()) {
            this.sleepTime = initFetchSleepTime;
            blockId = response.getNextBlockId();
            ManagedBuffer managedBuffer = null;
            ByteBuf byteBuf = null;
            RecordsReader<K, V> reader = null;
            try {
              managedBuffer = response.getData();
              byteBuf = managedBuffer.byteBuf();
              // Fetch blocks and parsing blocks are a synchronous process. If the two processes are
              // split into two different threads, then will be asynchronous processes. Although it
              // seems to save time, it actually consumes more memory.
              reader =
                  new RecordsReader<>(
                      rssConf,
                      SerInputStream.newInputStream(byteBuf),
                      keyClass,
                      valueClass,
                      raw,
                      false);
              reader.init();
              while (reader.next()) {
                if (metrics != null) {
                  metrics.incRecordsRead(1);
                }
                if (recordBuffer.size() >= maxRecordsNumPerBuffer) {
                  nextQueue.put(recordBuffer);
                  recordBuffer = new RecordBuffer<>(partitionId);
                }
                recordBuffer.addRecord(reader.getCurrentKey(), reader.getCurrentValue());
              }
            } finally {
              if (reader != null) {
                reader.close();
              }
              if (byteBuf != null) {
                byteBuf.release();
              }
              if (managedBuffer != null) {
                managedBuffer.release();
              }
            }
          } else {
            fetchError = "Receive wrong offset from server, offset is " + response.getNextBlockId();
            nextShuffleServerInfo();
            break;
          }
        } catch (Throwable e) {
          error = e;
          stop = true;
          LOG.info("Found exception when fetch sorted record, caused by ", e);
        }
      }
    }