private void handleEventAndUpdateMetrics()

in server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java [114:192]


  private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, Storage storage) {
    long start = System.currentTimeMillis();
    String appId = event.getAppId();
    ReentrantReadWriteLock.ReadLock readLock =
        shuffleServer.getShuffleTaskManager().getAppReadLock(appId);
    try {
      readLock.lock();
      try {
        consumeEvent(event);
      } finally {
        readLock.unlock();
      }

      if (storage != null) {
        ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
      }
      event.doCleanup();

      if (LOG.isDebugEnabled()) {
        LOG.debug(
            "Flush event:{} successfully in {} ms and release {} bytes",
            event,
            System.currentTimeMillis() - start,
            event.getEncodedLength());
      }
    } catch (Exception e) {
      if (e instanceof EventRetryException) {
        event.increaseRetryTimes();
        event.markPended();
        if (storage != null) {
          ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
        }
        this.handle(event);
        return;
      }

      ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
      ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
      if (e instanceof EventDiscardException) {
        if (storage != null) {
          ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
        }
        event.doCleanup();
        LOG.error(
            "Flush event: {} failed in {} ms and release {} bytes. This will make data lost.",
            event,
            System.currentTimeMillis() - start,
            event.getEncodedLength());
        return;
      }

      if (e instanceof EventInvalidException) {
        event.doCleanup();
        return;
      }

      LOG.error(
          "Unexpected exceptions happened when handling the flush event: [{}] (it cost {} ms), due to ",
          event,
          System.currentTimeMillis() - start,
          e);
      // We need to release the memory when unexpected exceptions happened
      event.doCleanup();
    } finally {
      if (storage != null) {
        if (storage instanceof HadoopStorage) {
          ShuffleServerMetrics.counterHadoopEventFlush.inc();
          ShuffleServerMetrics.gaugeHadoopFlushThreadPoolQueueSize.dec();
        } else if (storage instanceof LocalStorage) {
          ShuffleServerMetrics.counterLocalFileEventFlush.inc();
          ShuffleServerMetrics.gaugeLocalfileFlushThreadPoolQueueSize.dec();
        } else {
          ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.dec();
        }
      } else {
        ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.dec();
      }
    }
  }