private boolean flushToFile()

in server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java [125:216]


  private boolean flushToFile(ShuffleDataFlushEvent event) {
    boolean writeSuccess = false;

    try {
      if (!event.isValid()) {
        LOG.warn(
            "AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
        return true;
      }

      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
      if (blocks == null || blocks.isEmpty()) {
        LOG.info("There is no block to be flushed: {}", event);
        return true;
      }

      Storage storage = event.getUnderStorage();
      if (storage == null) {
        LOG.error("Storage selected is null and this should not happen. event: {}", event);
        return true;
      }

      if (event.isPended()
          && System.currentTimeMillis() - event.getStartPendingTime()
              > pendingEventTimeoutSec * 1000L) {
        ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
        LOG.error(
            "Flush event cannot be flushed for {} sec, the event {} is dropped",
            pendingEventTimeoutSec,
            event);
        return true;
      }

      if (!storage.canWrite()) {
        // todo: Could we add an interface supportPending for storageManager
        //       to unify following logic of multiple different storage managers
        if (event.getRetryTimes() <= retryMax) {
          if (event.isPended()) {
            LOG.error(
                "Drop this event directly due to already having entered pending queue. event: {}",
                event);
            return true;
          }
          event.increaseRetryTimes();
          ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
          event.markPended();
          eventHandler.handle(event);
        }
        return false;
      }

      String user =
          StringUtils.defaultString(
              shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
              StringUtils.EMPTY);
      int maxConcurrencyPerPartitionToWrite = getMaxConcurrencyPerPartitionWrite(event);
      CreateShuffleWriteHandlerRequest request =
          new CreateShuffleWriteHandlerRequest(
              storageType,
              event.getAppId(),
              event.getShuffleId(),
              event.getStartPartition(),
              event.getEndPartition(),
              storageBasePaths.toArray(new String[storageBasePaths.size()]),
              getShuffleServerId(),
              hadoopConf,
              storageDataReplica,
              user,
              maxConcurrencyPerPartitionToWrite);
      ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
      writeSuccess = storageManager.write(storage, handler, event);
      if (writeSuccess) {
        updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
        ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
      } else if (event.getRetryTimes() <= retryMax) {
        if (event.isPended()) {
          LOG.error(
              "Drop this event directly due to already having entered pending queue. event: {}",
              event);
        }
        event.increaseRetryTimes();
        ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
        event.markPended();
        eventHandler.handle(event);
      }
    } catch (Throwable throwable) {
      // just log the error, don't throw the exception and stop the flush thread
      LOG.error("Exception happened when process flush shuffle data for {}", event, throwable);
      event.increaseRetryTimes();
    }
    return writeSuccess;
  }