public ContainerStorageManager()

in samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java [125:208]


  public ContainerStorageManager(
      CheckpointManager checkpointManager,
      ContainerModel containerModel,
      StreamMetadataCache streamMetadataCache,
      SystemAdmins systemAdmins,
      Map<String, SystemStream> changelogSystemStreams,
      Map<String, Set<SystemStream>> sideInputSystemStreams,
      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
      Map<String, SystemFactory> systemFactories,
      Map<String, Serde<Object>> serdes,
      Config config,
      Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
      SamzaContainerMetrics samzaContainerMetrics,
      JobContext jobContext,
      ContainerContext containerContext,
      Map<String, StateBackendFactory> restoreStateBackendFactories,
      Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
      File loggedStoreBaseDirectory,
      File nonLoggedStoreBaseDirectory,
      SerdeManager serdeManager,
      Clock clock) {
    this.checkpointManager = checkpointManager;
    this.containerModel = containerModel;
    this.streamMetadataCache = streamMetadataCache;
    this.systemAdmins = systemAdmins;
    this.changelogSystemStreams = changelogSystemStreams;
    this.sideInputSystemStreams = sideInputSystemStreams;
    this.storageEngineFactories = storageEngineFactories;
    this.systemFactories = systemFactories;
    this.serdes = serdes;
    this.config = config;
    this.taskInstanceMetrics = taskInstanceMetrics;
    this.samzaContainerMetrics = samzaContainerMetrics;
    this.jobContext = jobContext;
    this.containerContext = containerContext;
    this.restoreStateBackendFactories = restoreStateBackendFactories;
    this.taskInstanceCollectors = taskInstanceCollectors;
    this.loggedStoreBaseDirectory = loggedStoreBaseDirectory;
    this.nonLoggedStoreBaseDirectory = nonLoggedStoreBaseDirectory;
    this.serdeManager = serdeManager;
    this.clock = clock;

    this.sideInputStoreNames =
        ContainerStorageManagerUtil.getSideInputStoreNames(sideInputSystemStreams, changelogSystemStreams, containerModel);
    this.activeTaskChangelogSystemStreams =
        ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(changelogSystemStreams, containerModel);

    LOG.info("Starting with changelogSystemStreams = {} activeTaskChangelogSystemStreams = {} sideInputSystemStreams = {}",
        changelogSystemStreams, activeTaskChangelogSystemStreams, sideInputSystemStreams);

    if (loggedStoreBaseDirectory != null && loggedStoreBaseDirectory.equals(nonLoggedStoreBaseDirectory)) {
      LOG.warn("Logged and non-logged store base directory are configured to same path: {}. It is recommended to configure"
              + "them separately to ensure clean up of non-logged store data doesn't accidentally impact logged store data.",
          loggedStoreBaseDirectory);
    }

    // Note: The store directory paths are used by SamzaContainer to add a metric to watch the disk space usage
    // of the store directories. The stores itself does not need to be created but the store directory paths need to be
    // set to be able to monitor them, once they're created and in use.
    this.storeDirectoryPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories,
        activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, storageManagerUtil,
        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);

    this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(
        activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames, containerModel, jobContext,
        containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);

    // Refactor Note (prateekm): in previous version, there's a subtle difference between 'this.changelogSystemStreams'
    // (which is actually activeTaskChangelogSystemStreams) vs the passed in changelogSystemStreams.
    // create a map from storeNames to changelog system consumers (1 per system in activeTaskChangelogSystemStreams)
    this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(
        activeTaskChangelogSystemStreams, systemFactories, samzaContainerMetrics.registry(), config);

    JobConfig jobConfig = new JobConfig(config);
    int restoreThreadPoolSize =
        Math.min(
            Math.max(containerModel.getTasks().size() * restoreStateBackendFactories.size() * 2,
                jobConfig.getRestoreThreadPoolSize()),
            jobConfig.getRestoreThreadPoolMaxSize()
        );
    this.restoreExecutor = Executors.newFixedThreadPool(restoreThreadPoolSize,
        new ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
  }