in server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java [114:192]
private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, Storage storage) {
long start = System.currentTimeMillis();
String appId = event.getAppId();
ReentrantReadWriteLock.ReadLock readLock =
shuffleServer.getShuffleTaskManager().getAppReadLock(appId);
try {
readLock.lock();
try {
consumeEvent(event);
} finally {
readLock.unlock();
}
if (storage != null) {
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
}
event.doCleanup();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Flush event:{} successfully in {} ms and release {} bytes",
event,
System.currentTimeMillis() - start,
event.getEncodedLength());
}
} catch (Exception e) {
if (e instanceof EventRetryException) {
event.increaseRetryTimes();
event.markPended();
if (storage != null) {
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
}
this.handle(event);
return;
}
ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
if (e instanceof EventDiscardException) {
if (storage != null) {
ShuffleServerMetrics.incStorageFailedCounter(storage.getStorageHost());
}
event.doCleanup();
LOG.error(
"Flush event: {} failed in {} ms and release {} bytes. This will make data lost.",
event,
System.currentTimeMillis() - start,
event.getEncodedLength());
return;
}
if (e instanceof EventInvalidException) {
event.doCleanup();
return;
}
LOG.error(
"Unexpected exceptions happened when handling the flush event: [{}] (it cost {} ms), due to ",
event,
System.currentTimeMillis() - start,
e);
// We need to release the memory when unexpected exceptions happened
event.doCleanup();
} finally {
if (storage != null) {
if (storage instanceof HadoopStorage) {
ShuffleServerMetrics.counterHadoopEventFlush.inc();
ShuffleServerMetrics.gaugeHadoopFlushThreadPoolQueueSize.dec();
} else if (storage instanceof LocalStorage) {
ShuffleServerMetrics.counterLocalFileEventFlush.inc();
ShuffleServerMetrics.gaugeLocalfileFlushThreadPoolQueueSize.dec();
} else {
ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.dec();
}
} else {
ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.dec();
}
}
}