public SideInputsManager()

in samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java [116:194]


  public SideInputsManager(Map<String, Set<SystemStream>> sideInputSystemStreams,
      Map<String, SystemFactory> systemFactories,
      Map<String, SystemStream> changelogSystemStreams,
      Map<String, SystemStream> activeTaskChangelogSystemStreams,
      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
      ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
      SamzaContainerMetrics samzaContainerMetrics,
      Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
      Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
      StreamMetadataCache streamMetadataCache,
      SystemAdmins systemAdmins,
      SerdeManager serdeManager, Map<String, Serde<Object>> serdes,
      StorageManagerUtil storageManagerUtil,
      File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
      Config config, Clock clock) {
    this.taskSideInputStoreSSPs = getTaskSideInputSSPs(sideInputSystemStreams, changelogSystemStreams, containerModel);
    this.sideInputStoreNames = ContainerStorageManagerUtil.getSideInputStoreNames(
        sideInputSystemStreams, changelogSystemStreams, containerModel);
    this.sideInputTaskLatches = new HashMap<>();
    this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream()
        .flatMap(m -> m.values().stream())
        .flatMap(Collection::stream)
        .findAny()
        .isPresent();

    this.taskInstanceMetrics = taskInstanceMetrics;
    this.samzaContainerMetrics = samzaContainerMetrics;
    this.config = config;

    // create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories
    this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(
        sideInputStoreNames, storageEngineFactories, sideInputStoreNames,
        activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes,
        taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);

    this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, sideInputStores, taskSideInputStoreSSPs,
        sideInputTaskLatches, taskInstanceMetrics, containerModel, streamMetadataCache, systemAdmins, serdes,
        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock
    );

    // create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used
    if (this.hasSideInputs) {
      Set<SystemStream> containerSideInputSystemStreams = this.taskSideInputStoreSSPs.values().stream()
          .flatMap(map -> map.values().stream())
          .flatMap(Set::stream)
          .map(SystemStreamPartition::getSystemStream)
          .collect(Collectors.toSet());

      Set<String> containerSideInputSystems = containerSideInputSystemStreams.stream()
          .map(SystemStream::getSystem)
          .collect(Collectors.toSet());

      // create sideInput consumers indexed by systemName
      // Mapping from storeSystemNames to SystemConsumers
      Map<String, SystemConsumer> sideInputConsumers =
          ContainerStorageManagerUtil.createSystemConsumers(containerSideInputSystems, systemFactories,
              samzaContainerMetrics.registry(), config);

      scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> inputStreamMetadata =
          streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(), false);

      // we use the same registry as samza-container-metrics
      SystemConsumersMetrics sideInputSystemConsumersMetrics =
          new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDE_INPUTS_METRICS_PREFIX);

      MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config,
          sideInputSystemConsumersMetrics.registry(), systemAdmins);

      ApplicationConfig applicationConfig = new ApplicationConfig(config);

      this.sideInputSystemConsumers =
          new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager,
              sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(),
              SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
              TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()),
              JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, applicationConfig.getRunId());
    }
  }