public void processFlushEvent()

in server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java [123:236]


  public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception {
    try {
      ShuffleServerMetrics.gaugeWriteHandler.inc();

      if (!event.isValid()) {
        LOG.warn(
            "AppId {} was removed already, event:{} should be dropped", event.getAppId(), event);
        // we should catch this to avoid cleaning up duplicate.
        throw new EventInvalidException();
      }

      if (reachRetryMax(event)) {
        LOG.error("The event:{} has been reached to max retry times, it will be dropped.", event);
        throw new EventDiscardException();
      }

      Collection<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
      if (CollectionUtils.isEmpty(blocks)) {
        LOG.info("There is no block to be flushed: {}", event);
        return;
      }

      Storage storage = event.getUnderStorage();
      if (storage == null) {
        LOG.error("Storage selected is null and this should not happen. event: {}", event);
        throw new EventDiscardException();
      }

      if (event.isPended()
          && System.currentTimeMillis() - event.getStartPendingTime()
              > pendingEventTimeoutSec * 1000L) {
        LOG.error(
            "Flush event cannot be flushed for {} sec, the event {} is dropped",
            pendingEventTimeoutSec,
            event);
        throw new EventDiscardException();
      }

      if (!storage.canWrite()) {
        LOG.error(
            "The event: {} is limited to flush due to storage:{} can't write", event, storage);
        throw new EventRetryException();
      }

      String user =
          StringUtils.defaultString(
              shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
              StringUtils.EMPTY);
      int maxConcurrencyPerPartitionToWrite = getMaxConcurrencyPerPartitionWrite(event);
      CreateShuffleWriteHandlerRequest request =
          new CreateShuffleWriteHandlerRequest(
              this.shuffleServerConf,
              storageType,
              event.getAppId(),
              event.getShuffleId(),
              event.getStartPartition(),
              event.getEndPartition(),
              storageBasePaths.toArray(new String[storageBasePaths.size()]),
              getShuffleServerId(),
              hadoopConf,
              storageDataReplica,
              user,
              maxConcurrencyPerPartitionToWrite);
      ShuffleWriteHandlerWrapper handlerWrapper;
      try {
        handlerWrapper = storage.getOrCreateWriteHandler(request);
      } catch (Exception e) {
        LOG.warn("Failed to create write handlerWrapper for event: {}", event, e);
        throw new EventRetryException(e);
      }

      long startTime = System.currentTimeMillis();
      boolean writeSuccess = storageManager.write(storage, handlerWrapper.getHandler(), event);
      if (!writeSuccess) {
        throw new EventRetryException();
      }
      long endTime = System.currentTimeMillis();
      ShuffleTaskInfo shuffleTaskInfo =
          shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId());
      if (shuffleTaskInfo == null || !storageTypeWithMemory) {
        // With memory storage type should never need cachedBlockIds,
        // since client do not need call finish shuffle rpc
        // update some metrics for shuffle task
        updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks());
      }
      if (isStorageAuditLogEnabled) {
        AUDIT_LOGGER.info(
            String.format(
                "%s|%s|%d|%s|%s|%s|%d|%s|%s|%d",
                AuditType.WRITE.getValue(),
                event.getAppId(),
                event.getShuffleId(),
                event.getStartPartition() + "_" + event.getEndPartition(),
                event.getUnderStorage().getStorageHost(),
                event.getUnderStorage().getStoragePath(),
                event.getDataLength(),
                DateFormatUtils.format(startTime, AUDIT_DATE_PATTERN),
                DateFormatUtils.format(endTime, AUDIT_DATE_PATTERN),
                endTime - startTime));
      }
      if (null != shuffleTaskInfo) {
        String storageHost = event.getUnderStorage().getStorageHost();
        if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
          shuffleTaskInfo.addOnLocalFileDataSize(
              event.getEncodedLength(), handlerWrapper.isNewlyCreated());
        } else {
          shuffleTaskInfo.addOnHadoopDataSize(
              event.getEncodedLength(), handlerWrapper.isNewlyCreated());
        }
      }
    } finally {
      ShuffleServerMetrics.gaugeWriteHandler.dec();
    }
  }