client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java [312:377]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void sendShuffleBlocks(List<ShuffleBlockInfo> shuffleBlocks) {
    sendExecutorService.submit(
        new Runnable() {
          @Override
          public void run() {
            long size = 0;
            try {
              for (ShuffleBlockInfo block : shuffleBlocks) {
                size += block.getFreeMemory();
              }
              SendShuffleDataResult result =
                  shuffleWriteClient.sendShuffleData(appId, shuffleBlocks, () -> false);
              successBlockIds.addAll(result.getSuccessBlockIds());
              failedBlockIds.addAll(result.getFailedBlockIds());
            } catch (Throwable t) {
              LOG.warn("send shuffle data exception ", t);
            } finally {
              memoryLock.lock();
              try {
                if (LOG.isDebugEnabled()) {
                  LOG.debug("memoryUsedSize {} decrease {}", memoryUsedSize, size);
                }
                memoryUsedSize.addAndGet(-size);
                inSendListBytes.addAndGet(-size);
                full.signalAll();
              } finally {
                memoryLock.unlock();
              }
            }
          }
        });
  }

  public void waitSendFinished() {
    while (!waitSendBuffers.isEmpty()) {
      sendBuffersToServers();
    }
    long start = System.currentTimeMillis();
    while (true) {
      checkFailedBlocks();
      // remove blockIds which was sent successfully, if there has none left, all data are sent
      allBlockIds.removeAll(successBlockIds);
      if (allBlockIds.isEmpty()) {
        break;
      }
      LOG.info("Wait " + allBlockIds.size() + " blocks sent to shuffle server");
      Uninterruptibles.sleepUninterruptibly(sendCheckInterval, TimeUnit.MILLISECONDS);
      if (System.currentTimeMillis() - start > sendCheckTimeout) {
        String errorMsg =
            "Timeout: failed because "
                + allBlockIds.size()
                + " blocks can't be sent to shuffle server in "
                + sendCheckTimeout
                + " ms.";
        LOG.error(errorMsg);
        throw new RssException(errorMsg);
      }
    }
    long commitDuration = 0;
    if (!isMemoryShuffleEnabled) {
      long s = System.currentTimeMillis();
      sendCommit();
      commitDuration = System.currentTimeMillis() - s;
    }

    start = System.currentTimeMillis();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java [299:363]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void sendShuffleBlocks(List<ShuffleBlockInfo> shuffleBlocks) {
    sendExecutorService.submit(
        new Runnable() {
          @Override
          public void run() {
            long size = 0;
            try {
              for (ShuffleBlockInfo block : shuffleBlocks) {
                size += block.getFreeMemory();
              }
              SendShuffleDataResult result =
                  shuffleWriteClient.sendShuffleData(appId, shuffleBlocks, () -> false);
              successBlockIds.addAll(result.getSuccessBlockIds());
              failedBlockIds.addAll(result.getFailedBlockIds());
            } catch (Throwable t) {
              LOG.warn("send shuffle data exception ", t);
            } finally {
              memoryLock.lock();
              try {
                if (LOG.isDebugEnabled()) {
                  LOG.debug("memoryUsedSize {} decrease {}", memoryUsedSize, size);
                }
                memoryUsedSize.addAndGet(-size);
                inSendListBytes.addAndGet(-size);
                full.signalAll();
              } finally {
                memoryLock.unlock();
              }
            }
          }
        });
  }

  /** wait send finished */
  public void waitSendFinished() {
    while (!waitSendBuffers.isEmpty()) {
      sendBuffersToServers();
    }
    long start = System.currentTimeMillis();
    while (true) {
      checkFailedBlocks();
      allBlockIds.removeAll(successBlockIds);
      if (allBlockIds.isEmpty()) {
        break;
      }
      LOG.info("Wait " + allBlockIds.size() + " blocks sent to shuffle server");
      Uninterruptibles.sleepUninterruptibly(sendCheckInterval, TimeUnit.MILLISECONDS);
      if (System.currentTimeMillis() - start > sendCheckTimeout) {
        String errorMsg =
            "Timeout: failed because "
                + allBlockIds.size()
                + " blocks can't be sent to shuffle server in "
                + sendCheckTimeout
                + " ms.";
        LOG.error(errorMsg);
        throw new RssException(errorMsg);
      }
    }
    long commitDuration = 0;
    if (!isMemoryShuffleEnabled) {
      long s = System.currentTimeMillis();
      sendCommit();
      commitDuration = System.currentTimeMillis() - s;
    }
    start = System.currentTimeMillis();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



