in master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java [228:302]
public void restoreMetaFromFile(File file) throws IOException {
try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(file))) {
PbSnapshotMetaInfo snapshotMetaInfo = PbSnapshotMetaInfo.parseFrom(in);
estimatedPartitionSize = snapshotMetaInfo.getEstimatedPartitionSize();
registeredShuffle.addAll(snapshotMetaInfo.getRegisteredShuffleList());
hostnameSet.addAll(snapshotMetaInfo.getHostnameSetList());
excludedWorkers.addAll(
snapshotMetaInfo.getExcludedWorkersList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
workerLostEvents.addAll(
snapshotMetaInfo.getWorkerLostEventsList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
appHeartbeatTime.putAll(snapshotMetaInfo.getAppHeartbeatTimeMap());
registeredShuffle.forEach(
shuffleKey -> {
String appId = shuffleKey.split("-")[0];
if (!appHeartbeatTime.containsKey(appId)) {
appHeartbeatTime.put(appId, System.currentTimeMillis());
}
});
workers.addAll(
snapshotMetaInfo.getWorkersList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet())
.stream()
.map(
workerInfo -> {
// Reset worker's network location with current master's configuration.
workerInfo.networkLocation_$eq(
rackResolver.resolve(workerInfo.host()).getNetworkLocation());
return workerInfo;
})
.collect(Collectors.toSet()));
snapshotMetaInfo
.getLostWorkersMap()
.entrySet()
.forEach(
entry -> lostWorkers.put(WorkerInfo.fromUniqueId(entry.getKey()), entry.getValue()));
shutdownWorkers.addAll(
snapshotMetaInfo.getShutdownWorkersList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
partitionTotalWritten.reset();
partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
partitionTotalFileCount.reset();
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
appDiskUsageMetric.restoreFromSnapshot(
snapshotMetaInfo.getAppDiskUsageMetricSnapshotsList().stream()
.map(PbSerDeUtils::fromPbAppDiskUsageSnapshot)
.toArray(AppDiskUsageSnapShot[]::new));
appDiskUsageMetric.currentSnapShot_$eq(
new AtomicReference<AppDiskUsageSnapShot>(
PbSerDeUtils.fromPbAppDiskUsageSnapshot(
snapshotMetaInfo.getCurrentAppDiskUsageMetricsSnapshot())));
} catch (Exception e) {
throw new IOException(e);
}
LOG.info("Successfully restore meta info from snapshot {}", file.getAbsolutePath());
LOG.info(
"Worker size: {}, Registered shuffle size: {}. Worker excluded list size: {}.",
workers.size(),
registeredShuffle.size(),
excludedWorkers.size());
workers.forEach(workerInfo -> LOG.info(workerInfo.toString()));
registeredShuffle.forEach(shuffle -> LOG.info("RegisteredShuffle {}", shuffle));
}