public TaskRestoreManager getRestoreManager()

in samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java [91:156]


  public TaskRestoreManager getRestoreManager(JobContext jobContext,
      ContainerContext containerContext,
      TaskModel taskModel,
      ExecutorService restoreExecutor,
      MetricsRegistry metricsRegistry,
      Set<String> storesToRestore,
      Config config,
      Clock clock,
      File loggedStoreBaseDir,
      File nonLoggedStoreBaseDir,
      KafkaChangelogRestoreParams kafkaChangelogRestoreParams) {
    Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
    Set<SystemStreamPartition> changelogSSPs = storeChangelogs.values().stream()
        .flatMap(ss -> containerContext.getContainerModel().getTasks().values().stream()
            .map(tm -> new SystemStreamPartition(ss, tm.getChangelogPartition())))
        .collect(Collectors.toSet());
    // filter out standby store-ssp pairs
    Map<String, SystemStream> filteredStoreChangelogs =
        filterStandbySystemStreams(storeChangelogs, containerContext.getContainerModel());
    SystemAdmins systemAdmins = new SystemAdmins(kafkaChangelogRestoreParams.getSystemAdmins());

    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
      return new TransactionalStateTaskRestoreManager(
          storesToRestore,
          jobContext,
          containerContext,
          taskModel,
          restoreExecutor,
          filteredStoreChangelogs,
          kafkaChangelogRestoreParams.getInMemoryStores(),
          kafkaChangelogRestoreParams.getStorageEngineFactories(),
          kafkaChangelogRestoreParams.getSerdes(),
          systemAdmins,
          kafkaChangelogRestoreParams.getStoreConsumers(),
          metricsRegistry,
          kafkaChangelogRestoreParams.getCollector(),
          getSspCache(systemAdmins, clock, changelogSSPs),
          loggedStoreBaseDir,
          nonLoggedStoreBaseDir,
          config,
          clock
      );
    } else {
      return new NonTransactionalStateTaskRestoreManager(
          storesToRestore,
          jobContext,
          containerContext,
          taskModel,
          restoreExecutor,
          filteredStoreChangelogs,
          kafkaChangelogRestoreParams.getInMemoryStores(),
          kafkaChangelogRestoreParams.getStorageEngineFactories(),
          kafkaChangelogRestoreParams.getSerdes(),
          systemAdmins,
          getStreamCache(systemAdmins, clock),
          kafkaChangelogRestoreParams.getStoreConsumers(),
          metricsRegistry,
          kafkaChangelogRestoreParams.getCollector(),
          jobContext.getJobModel().getMaxChangeLogStreamPartitions(),
          loggedStoreBaseDir,
          nonLoggedStoreBaseDir,
          config,
          clock
      );
    }
  }