in samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java [125:208]
public ContainerStorageManager(
CheckpointManager checkpointManager,
ContainerModel containerModel,
StreamMetadataCache streamMetadataCache,
SystemAdmins systemAdmins,
Map<String, SystemStream> changelogSystemStreams,
Map<String, Set<SystemStream>> sideInputSystemStreams,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Map<String, SystemFactory> systemFactories,
Map<String, Serde<Object>> serdes,
Config config,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
SamzaContainerMetrics samzaContainerMetrics,
JobContext jobContext,
ContainerContext containerContext,
Map<String, StateBackendFactory> restoreStateBackendFactories,
Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
File loggedStoreBaseDirectory,
File nonLoggedStoreBaseDirectory,
SerdeManager serdeManager,
Clock clock) {
this.checkpointManager = checkpointManager;
this.containerModel = containerModel;
this.streamMetadataCache = streamMetadataCache;
this.systemAdmins = systemAdmins;
this.changelogSystemStreams = changelogSystemStreams;
this.sideInputSystemStreams = sideInputSystemStreams;
this.storageEngineFactories = storageEngineFactories;
this.systemFactories = systemFactories;
this.serdes = serdes;
this.config = config;
this.taskInstanceMetrics = taskInstanceMetrics;
this.samzaContainerMetrics = samzaContainerMetrics;
this.jobContext = jobContext;
this.containerContext = containerContext;
this.restoreStateBackendFactories = restoreStateBackendFactories;
this.taskInstanceCollectors = taskInstanceCollectors;
this.loggedStoreBaseDirectory = loggedStoreBaseDirectory;
this.nonLoggedStoreBaseDirectory = nonLoggedStoreBaseDirectory;
this.serdeManager = serdeManager;
this.clock = clock;
this.sideInputStoreNames =
ContainerStorageManagerUtil.getSideInputStoreNames(sideInputSystemStreams, changelogSystemStreams, containerModel);
this.activeTaskChangelogSystemStreams =
ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(changelogSystemStreams, containerModel);
LOG.info("Starting with changelogSystemStreams = {} activeTaskChangelogSystemStreams = {} sideInputSystemStreams = {}",
changelogSystemStreams, activeTaskChangelogSystemStreams, sideInputSystemStreams);
if (loggedStoreBaseDirectory != null && loggedStoreBaseDirectory.equals(nonLoggedStoreBaseDirectory)) {
LOG.warn("Logged and non-logged store base directory are configured to same path: {}. It is recommended to configure"
+ "them separately to ensure clean up of non-logged store data doesn't accidentally impact logged store data.",
loggedStoreBaseDirectory);
}
// Note: The store directory paths are used by SamzaContainer to add a metric to watch the disk space usage
// of the store directories. The stores itself does not need to be created but the store directory paths need to be
// set to be able to monitor them, once they're created and in use.
this.storeDirectoryPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories,
activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(
activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames, containerModel, jobContext,
containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
// Refactor Note (prateekm): in previous version, there's a subtle difference between 'this.changelogSystemStreams'
// (which is actually activeTaskChangelogSystemStreams) vs the passed in changelogSystemStreams.
// create a map from storeNames to changelog system consumers (1 per system in activeTaskChangelogSystemStreams)
this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(
activeTaskChangelogSystemStreams, systemFactories, samzaContainerMetrics.registry(), config);
JobConfig jobConfig = new JobConfig(config);
int restoreThreadPoolSize =
Math.min(
Math.max(containerModel.getTasks().size() * restoreStateBackendFactories.size() * 2,
jobConfig.getRestoreThreadPoolSize()),
jobConfig.getRestoreThreadPoolMaxSize()
);
this.restoreExecutor = Executors.newFixedThreadPool(restoreThreadPoolSize,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
}