private Map restoreStores()

in samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java [258:372]


  private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
    LOG.info("Store Restore started");
    Set<TaskName> activeTasks = ContainerStorageManagerUtil.getTasks(containerModel, TaskMode.Active).keySet();
    // Find all non-side input stores
    Set<String> nonSideInputStoreNames = storageEngineFactories.keySet()
        .stream()
        .filter(storeName -> !sideInputStoreNames.contains(storeName))
        .collect(Collectors.toSet());

    // Obtain the checkpoints for each task
    Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers = new HashMap<>();
    Map<TaskName, Checkpoint> taskCheckpoints = new HashMap<>();
    Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames = new HashMap<>();
    containerModel.getTasks().forEach((taskName, taskModel) -> {
      Checkpoint taskCheckpoint = null;
      if (checkpointManager != null && activeTasks.contains(taskName)) {
        // only pass in checkpoints for active tasks
        taskCheckpoint = checkpointManager.readLastCheckpoint(taskName);
        LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", taskCheckpoint, taskName);
      }

      // Only insert non-null checkpoints
      if (taskCheckpoint != null) {
        taskCheckpoints.put(taskName, taskCheckpoint);
      }

      Map<String, Set<String>> backendFactoryToStoreNames =
          ContainerStorageManagerUtil.getBackendFactoryStoreNames(
              nonSideInputStoreNames, taskCheckpoint, new StorageConfig(config));

      Map<String, Set<String>> backendFactoryToSideInputStoreNames =
          ContainerStorageManagerUtil.getBackendFactoryStoreNames(
              sideInputStoreNames, taskCheckpoint, new StorageConfig(config));

      // include side input stores for (initial bulk) restore if backed up using blob store state backend
      String blobStoreStateBackendFactory = BlobStoreStateBackendFactory.class.getName();
      if (backendFactoryToSideInputStoreNames.containsKey(blobStoreStateBackendFactory)) {
        Set<String> sideInputStoreNames = backendFactoryToSideInputStoreNames.get(blobStoreStateBackendFactory);

        if (backendFactoryToStoreNames.containsKey(blobStoreStateBackendFactory)) {
          backendFactoryToStoreNames.get(blobStoreStateBackendFactory).addAll(sideInputStoreNames);
        } else {
          backendFactoryToStoreNames.put(blobStoreStateBackendFactory, sideInputStoreNames);
        }
      }

      Map<String, TaskRestoreManager> taskStoreRestoreManagers =
          ContainerStorageManagerUtil.createTaskRestoreManagers(
              taskName, backendFactoryToStoreNames, restoreStateBackendFactories,
              storageEngineFactories, storeConsumers,
              inMemoryStores, systemAdmins, restoreExecutor,
              taskModel, jobContext, containerContext,
              samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes,
              loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
      taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
      taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
    });

    // if we have received no input checkpoints, it can only be due to two reasons:
    // a) Samza job is new, so it has no previous checkpoints.
    // b) The checkpoints were cleared.
    // We should be able to safely clear local logged stores in either case
    if (taskCheckpoints.isEmpty()) {
      LOG.info("No checkpoints read. Attempting to clear logged stores.");
      clearLoggedStores(loggedStoreBaseDirectory);
    }

    // Init all taskRestores and if successful, restores all the task stores concurrently
    LOG.debug("Pre init and restore checkpoints is: {}", taskCheckpoints);
    CompletableFuture<Map<TaskName, Checkpoint>> initRestoreAndNewCheckpointFuture =
        ContainerStorageManagerRestoreUtil.initAndRestoreTaskInstances(taskRestoreManagers, samzaContainerMetrics,
            checkpointManager, jobContext, containerModel, taskCheckpoints, taskBackendFactoryToStoreNames, config,
            restoreExecutor, taskInstanceMetrics, loggedStoreBaseDirectory, storeConsumers);

    // Update the task checkpoints map, if it was updated during the restore. Throw an exception if the restore or
    // creating a new checkpoint (in case of BlobStoreBackendFactory) failed.
    try {
      Map<TaskName, Checkpoint> newTaskCheckpoints = initRestoreAndNewCheckpointFuture.get();
      taskCheckpoints.putAll(newTaskCheckpoints);
      LOG.debug("Post init and restore checkpoints is: {}. NewTaskCheckpoints are: {}", taskCheckpoints, newTaskCheckpoints);
    } catch (InterruptedException e) {
      LOG.warn("Received an interrupt during store restoration. Interrupting the restore executor to exit "
          + "prematurely without restoring full state.");
      restoreExecutor.shutdownNow();
      throw e;
    } catch (Exception e) {
      LOG.error("Exception when restoring state.", e);
      throw new SamzaException("Exception when restoring state.", e);
    }

    // Stop each store consumer once
    this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);

    // Now create persistent non-side-input stores in read-write mode, leave non-persistent and side-input stores as-is
    Set<String> inMemoryStoreNames =
        ContainerStorageManagerUtil.getInMemoryStoreNames(this.storageEngineFactories, this.config);
    Set<String> storesToCreate = nonSideInputStoreNames.stream()
        .filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet());
    this.taskStores = ContainerStorageManagerUtil.createTaskStores(
        storesToCreate, this.storageEngineFactories, this.sideInputStoreNames,
        this.activeTaskChangelogSystemStreams, this.containerModel, this.jobContext, this.containerContext,
        this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
        this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config);

    // Add in memory stores
    this.inMemoryStores.forEach((taskName, stores) -> {
      if (!this.taskStores.containsKey(taskName)) {
        taskStores.put(taskName, new HashMap<>());
      }
      taskStores.get(taskName).putAll(stores);
    });

    LOG.info("Store Restore complete");
    return taskCheckpoints;
  }