public DataPusher()

in client/src/main/java/org/apache/celeborn/client/write/DataPusher.java [63:141]


  public DataPusher(
      int shuffleId,
      int mapId,
      int attemptId,
      long taskId,
      int numMappers,
      int numPartitions,
      CelebornConf conf,
      ShuffleClient client,
      LinkedBlockingQueue<PushTask> pushTasks,
      Consumer<Integer> afterPush,
      LongAdder[] mapStatusLengths)
      throws InterruptedException {
    final int pushQueueCapacity = conf.clientPushQueueCapacity();
    final int pushBufferMaxSize = conf.clientPushBufferMaxSize();

    if (pushTasks == null) {
      idleQueue = new LinkedBlockingQueue<>(pushQueueCapacity);
    } else {
      idleQueue = pushTasks;
    }
    dataPushQueue =
        new DataPushQueue(
            conf, this, client, shuffleId, mapId, attemptId, numMappers, numPartitions);

    for (int i = idleQueue.size(); i < pushQueueCapacity; i++) {
      idleQueue.put(new PushTask(pushBufferMaxSize));
    }

    this.shuffleId = shuffleId;
    this.mapId = mapId;
    this.attemptId = attemptId;
    this.numMappers = numMappers;
    this.numPartitions = numPartitions;
    this.client = client;
    this.afterPush = afterPush;
    this.mapStatusLengths = mapStatusLengths;

    pushThread =
        new Thread("DataPusher-" + taskId) {
          private void reclaimTask(PushTask task) throws InterruptedException {
            idleLock.lockInterruptibly();
            try {
              idleQueue.put(task);
              if (idleQueue.remainingCapacity() == 0) {
                idleFull.signal();
              }
            } catch (InterruptedException e) {
              logger.error("DataPusher thread interrupted while reclaiming data.");
              throw e;
            } finally {
              idleLock.unlock();
            }
          }

          @Override
          public void run() {
            while (stillRunning()) {
              try {
                ArrayList<PushTask> tasks = dataPushQueue.takePushTasks();
                for (int i = 0; i < tasks.size(); i++) {
                  PushTask task = tasks.get(i);
                  pushData(task);
                  reclaimTask(task);
                }
              } catch (CelebornIOException e) {
                exceptionRef.set(e);
              } catch (IOException e) {
                exceptionRef.set(new CelebornIOException(e));
              } catch (InterruptedException e) {
                logger.error("DataPusher push thread interrupted while pushing data.");
                break;
              }
            }
          }
        };
    pushThread.setDaemon(true);
    pushThread.start();
  }