in server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java [125:216]
private boolean flushToFile(ShuffleDataFlushEvent event) {
boolean writeSuccess = false;
try {
if (!event.isValid()) {
LOG.warn(
"AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
return true;
}
List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
if (blocks == null || blocks.isEmpty()) {
LOG.info("There is no block to be flushed: {}", event);
return true;
}
Storage storage = event.getUnderStorage();
if (storage == null) {
LOG.error("Storage selected is null and this should not happen. event: {}", event);
return true;
}
if (event.isPended()
&& System.currentTimeMillis() - event.getStartPendingTime()
> pendingEventTimeoutSec * 1000L) {
ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
LOG.error(
"Flush event cannot be flushed for {} sec, the event {} is dropped",
pendingEventTimeoutSec,
event);
return true;
}
if (!storage.canWrite()) {
// todo: Could we add an interface supportPending for storageManager
// to unify following logic of multiple different storage managers
if (event.getRetryTimes() <= retryMax) {
if (event.isPended()) {
LOG.error(
"Drop this event directly due to already having entered pending queue. event: {}",
event);
return true;
}
event.increaseRetryTimes();
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
event.markPended();
eventHandler.handle(event);
}
return false;
}
String user =
StringUtils.defaultString(
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
StringUtils.EMPTY);
int maxConcurrencyPerPartitionToWrite = getMaxConcurrencyPerPartitionWrite(event);
CreateShuffleWriteHandlerRequest request =
new CreateShuffleWriteHandlerRequest(
storageType,
event.getAppId(),
event.getShuffleId(),
event.getStartPartition(),
event.getEndPartition(),
storageBasePaths.toArray(new String[storageBasePaths.size()]),
getShuffleServerId(),
hadoopConf,
storageDataReplica,
user,
maxConcurrencyPerPartitionToWrite);
ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
writeSuccess = storageManager.write(storage, handler, event);
if (writeSuccess) {
updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
} else if (event.getRetryTimes() <= retryMax) {
if (event.isPended()) {
LOG.error(
"Drop this event directly due to already having entered pending queue. event: {}",
event);
}
event.increaseRetryTimes();
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
event.markPended();
eventHandler.handle(event);
}
} catch (Throwable throwable) {
// just log the error, don't throw the exception and stop the flush thread
LOG.error("Exception happened when process flush shuffle data for {}", event, throwable);
event.increaseRetryTimes();
}
return writeSuccess;
}