in server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java [123:236]
public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception {
try {
ShuffleServerMetrics.gaugeWriteHandler.inc();
if (!event.isValid()) {
LOG.warn(
"AppId {} was removed already, event:{} should be dropped", event.getAppId(), event);
// we should catch this to avoid cleaning up duplicate.
throw new EventInvalidException();
}
if (reachRetryMax(event)) {
LOG.error("The event:{} has been reached to max retry times, it will be dropped.", event);
throw new EventDiscardException();
}
Collection<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
if (CollectionUtils.isEmpty(blocks)) {
LOG.info("There is no block to be flushed: {}", event);
return;
}
Storage storage = event.getUnderStorage();
if (storage == null) {
LOG.error("Storage selected is null and this should not happen. event: {}", event);
throw new EventDiscardException();
}
if (event.isPended()
&& System.currentTimeMillis() - event.getStartPendingTime()
> pendingEventTimeoutSec * 1000L) {
LOG.error(
"Flush event cannot be flushed for {} sec, the event {} is dropped",
pendingEventTimeoutSec,
event);
throw new EventDiscardException();
}
if (!storage.canWrite()) {
LOG.error(
"The event: {} is limited to flush due to storage:{} can't write", event, storage);
throw new EventRetryException();
}
String user =
StringUtils.defaultString(
shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
StringUtils.EMPTY);
int maxConcurrencyPerPartitionToWrite = getMaxConcurrencyPerPartitionWrite(event);
CreateShuffleWriteHandlerRequest request =
new CreateShuffleWriteHandlerRequest(
this.shuffleServerConf,
storageType,
event.getAppId(),
event.getShuffleId(),
event.getStartPartition(),
event.getEndPartition(),
storageBasePaths.toArray(new String[storageBasePaths.size()]),
getShuffleServerId(),
hadoopConf,
storageDataReplica,
user,
maxConcurrencyPerPartitionToWrite);
ShuffleWriteHandlerWrapper handlerWrapper;
try {
handlerWrapper = storage.getOrCreateWriteHandler(request);
} catch (Exception e) {
LOG.warn("Failed to create write handlerWrapper for event: {}", event, e);
throw new EventRetryException(e);
}
long startTime = System.currentTimeMillis();
boolean writeSuccess = storageManager.write(storage, handlerWrapper.getHandler(), event);
if (!writeSuccess) {
throw new EventRetryException();
}
long endTime = System.currentTimeMillis();
ShuffleTaskInfo shuffleTaskInfo =
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId());
if (shuffleTaskInfo == null || !storageTypeWithMemory) {
// With memory storage type should never need cachedBlockIds,
// since client do not need call finish shuffle rpc
// update some metrics for shuffle task
updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks());
}
if (isStorageAuditLogEnabled) {
AUDIT_LOGGER.info(
String.format(
"%s|%s|%d|%s|%s|%s|%d|%s|%s|%d",
AuditType.WRITE.getValue(),
event.getAppId(),
event.getShuffleId(),
event.getStartPartition() + "_" + event.getEndPartition(),
event.getUnderStorage().getStorageHost(),
event.getUnderStorage().getStoragePath(),
event.getDataLength(),
DateFormatUtils.format(startTime, AUDIT_DATE_PATTERN),
DateFormatUtils.format(endTime, AUDIT_DATE_PATTERN),
endTime - startTime));
}
if (null != shuffleTaskInfo) {
String storageHost = event.getUnderStorage().getStorageHost();
if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
shuffleTaskInfo.addOnLocalFileDataSize(
event.getEncodedLength(), handlerWrapper.isNewlyCreated());
} else {
shuffleTaskInfo.addOnHadoopDataSize(
event.getEncodedLength(), handlerWrapper.isNewlyCreated());
}
}
} finally {
ShuffleServerMetrics.gaugeWriteHandler.dec();
}
}