samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java [334:353]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private Map<String, StorageEngine> createStoreEngines(Set<String> storeNames, JobContext jobContext,
      ContainerContext containerContext, Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
      Map<String, Serde<Object>> serdes, MetricsRegistry metricsRegistry,
      MessageCollector messageCollector, Map<String, StorageEngine> nonPersistedStores) {
    Map<String, StorageEngine> storageEngines = new HashMap<>();
    // Put non persisted stores
    nonPersistedStores.forEach(storageEngines::put);
    // Create persisted stores
    storeNames.stream().filter(s -> !nonPersistedStores.containsKey(s)).forEach(storeName -> {
      boolean isLogged = this.storeChangelogs.containsKey(storeName);
      File storeBaseDir = isLogged ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory;
      File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskModel.getTaskName(),
          taskModel.getTaskMode());
      StorageEngine engine = ContainerStorageManagerUtil.createStore(storeName, storeDirectory,
          StorageEngineFactory.StoreMode.BulkLoad, storageEngineFactories, this.storeChangelogs,
          taskModel, jobContext, containerContext, serdes,
          metricsRegistry, messageCollector, this.config);
      storageEngines.put(storeName, engine);
    });
    return storageEngines;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java [188:207]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private Map<String, StorageEngine> createStoreEngines(Set<String> storeNames, JobContext jobContext,
      ContainerContext containerContext, Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
      Map<String, Serde<Object>> serdes, MetricsRegistry metricsRegistry,
      MessageCollector messageCollector, Map<String, StorageEngine> nonPersistedStores) {
    Map<String, StorageEngine> storageEngines = new HashMap<>();
    // Put non persisted stores
    nonPersistedStores.forEach(storageEngines::put);
    // Create persisted stores
    storeNames.stream().filter(s -> !nonPersistedStores.containsKey(s)).forEach(storeName -> {
      boolean isLogged = this.storeChangelogs.containsKey(storeName);
      File storeBaseDir = isLogged ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory;
      File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskModel.getTaskName(),
          taskModel.getTaskMode());
      StorageEngine engine = ContainerStorageManagerUtil.createStore(storeName, storeDirectory,
          StorageEngineFactory.StoreMode.BulkLoad, storageEngineFactories, this.storeChangelogs,
          taskModel, jobContext, containerContext, serdes,
          metricsRegistry, messageCollector, this.config);
      storageEngines.put(storeName, engine);
    });
    return storageEngines;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



