public void restoreMetaFromFile()

in master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java [383:491]


  public void restoreMetaFromFile(File file) throws IOException {
    try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(file))) {
      PbSnapshotMetaInfo snapshotMetaInfo = PbSnapshotMetaInfo.parseFrom(in);
      cleanUpState();

      estimatedPartitionSize = snapshotMetaInfo.getEstimatedPartitionSize();

      for (String shuffleKey : snapshotMetaInfo.getRegisteredShuffleList()) {
        Tuple2<String, Object> appIdShuffleId = Utils.splitShuffleKey(shuffleKey);
        registeredAppAndShuffles
            .computeIfAbsent(appIdShuffleId._1, v -> new HashSet<>())
            .add((Integer) appIdShuffleId._2);
      }
      hostnameSet.addAll(snapshotMetaInfo.getHostnameSetList());
      excludedWorkers.addAll(
          snapshotMetaInfo.getExcludedWorkersList().stream()
              .map(PbSerDeUtils::fromPbWorkerInfo)
              .collect(Collectors.toSet()));
      manuallyExcludedWorkers.addAll(
          snapshotMetaInfo.getManuallyExcludedWorkersList().stream()
              .map(PbSerDeUtils::fromPbWorkerInfo)
              .collect(Collectors.toSet()));
      workerLostEvents.addAll(
          snapshotMetaInfo.getWorkerLostEventsList().stream()
              .map(PbSerDeUtils::fromPbWorkerInfo)
              .collect(Collectors.toSet()));
      appHeartbeatTime.putAll(snapshotMetaInfo.getAppHeartbeatTimeMap());

      registeredAppAndShuffles.forEach(
          (appId, shuffleId) -> {
            if (!appHeartbeatTime.containsKey(appId)) {
              appHeartbeatTime.put(appId, System.currentTimeMillis());
            }
          });

      Set<WorkerInfo> workerInfoSet =
          snapshotMetaInfo.getWorkersList().stream()
              .map(PbSerDeUtils::fromPbWorkerInfo)
              .collect(Collectors.toSet());
      List<String> workerHostList =
          workerInfoSet.stream()
              .filter(w -> NetworkTopology.DEFAULT_RACK.equals(w.networkLocation()))
              .map(WorkerInfo::host)
              .collect(Collectors.toList());
      scala.collection.immutable.Map<String, Node> resolveMap =
          rackResolver.resolveToMap(workerHostList);
      workersMap.putAll(
          workerInfoSet.stream()
              .peek(
                  workerInfo -> {
                    // Reset worker's network location with current master's configuration.
                    if (NetworkTopology.DEFAULT_RACK.equals(workerInfo.networkLocation())) {
                      workerInfo.networkLocation_$eq(
                          resolveMap.get(workerInfo.host()).get().getNetworkLocation());
                    }
                  })
              .collect(Collectors.toMap(WorkerInfo::toUniqueId, w -> w)));

      snapshotMetaInfo
          .getLostWorkersMap()
          .forEach((key, value) -> lostWorkers.put(WorkerInfo.fromUniqueId(key), value));

      snapshotMetaInfo
          .getWorkerEventInfosMap()
          .entrySet()
          .forEach(
              entry ->
                  workerEventInfos.put(
                      WorkerInfo.fromUniqueId(entry.getKey()),
                      PbSerDeUtils.fromPbWorkerEventInfo(entry.getValue())));

      shutdownWorkers.addAll(
          snapshotMetaInfo.getShutdownWorkersList().stream()
              .map(PbSerDeUtils::fromPbWorkerInfo)
              .collect(Collectors.toSet()));

      decommissionWorkers.addAll(
          snapshotMetaInfo.getDecommissionWorkersList().stream()
              .map(PbSerDeUtils::fromPbWorkerInfo)
              .collect(Collectors.toSet()));

      partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
      partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
      shuffleTotalCount.add(snapshotMetaInfo.getShuffleTotalCount());
      addShuffleFallbackCounts(snapshotMetaInfo.getShuffleFallbackCountsMap());

      snapshotMetaInfo
          .getApplicationMetasMap()
          .forEach(
              (key, value) -> applicationMetas.put(key, PbSerDeUtils.fromPbApplicationMeta(value)));

      availableWorkers.addAll(
          workersMap.values().stream()
              .filter(worker -> isWorkerAvailable(worker))
              .collect(Collectors.toSet()));
    } catch (Exception e) {
      throw new IOException(e);
    }
    LOG.info("Successfully restore meta info from snapshot {}", file.getAbsolutePath());
    LOG.info(
        "Worker size: {}, Registered shuffle size: {}. Worker excluded list size: {}. Manually Excluded list size: {}",
        workersMap.size(),
        registeredAppAndShuffles.size(),
        excludedWorkers.size(),
        manuallyExcludedWorkers.size());
    workersMap.values().forEach(workerInfo -> LOG.info(workerInfo.toString()));
    registeredAppAndShuffles.forEach(
        (appId, shuffleId) -> LOG.info("RegisteredShuffle {}-{}", appId, shuffleId));
  }