in samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java [116:194]
public SideInputsManager(Map<String, Set<SystemStream>> sideInputSystemStreams,
Map<String, SystemFactory> systemFactories,
Map<String, SystemStream> changelogSystemStreams,
Map<String, SystemStream> activeTaskChangelogSystemStreams,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
SamzaContainerMetrics samzaContainerMetrics,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
StreamMetadataCache streamMetadataCache,
SystemAdmins systemAdmins,
SerdeManager serdeManager, Map<String, Serde<Object>> serdes,
StorageManagerUtil storageManagerUtil,
File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
Config config, Clock clock) {
this.taskSideInputStoreSSPs = getTaskSideInputSSPs(sideInputSystemStreams, changelogSystemStreams, containerModel);
this.sideInputStoreNames = ContainerStorageManagerUtil.getSideInputStoreNames(
sideInputSystemStreams, changelogSystemStreams, containerModel);
this.sideInputTaskLatches = new HashMap<>();
this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream()
.flatMap(m -> m.values().stream())
.flatMap(Collection::stream)
.findAny()
.isPresent();
this.taskInstanceMetrics = taskInstanceMetrics;
this.samzaContainerMetrics = samzaContainerMetrics;
this.config = config;
// create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories
this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(
sideInputStoreNames, storageEngineFactories, sideInputStoreNames,
activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes,
taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, sideInputStores, taskSideInputStoreSSPs,
sideInputTaskLatches, taskInstanceMetrics, containerModel, streamMetadataCache, systemAdmins, serdes,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock
);
// create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used
if (this.hasSideInputs) {
Set<SystemStream> containerSideInputSystemStreams = this.taskSideInputStoreSSPs.values().stream()
.flatMap(map -> map.values().stream())
.flatMap(Set::stream)
.map(SystemStreamPartition::getSystemStream)
.collect(Collectors.toSet());
Set<String> containerSideInputSystems = containerSideInputSystemStreams.stream()
.map(SystemStream::getSystem)
.collect(Collectors.toSet());
// create sideInput consumers indexed by systemName
// Mapping from storeSystemNames to SystemConsumers
Map<String, SystemConsumer> sideInputConsumers =
ContainerStorageManagerUtil.createSystemConsumers(containerSideInputSystems, systemFactories,
samzaContainerMetrics.registry(), config);
scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> inputStreamMetadata =
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(), false);
// we use the same registry as samza-container-metrics
SystemConsumersMetrics sideInputSystemConsumersMetrics =
new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDE_INPUTS_METRICS_PREFIX);
MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config,
sideInputSystemConsumersMetrics.registry(), systemAdmins);
ApplicationConfig applicationConfig = new ApplicationConfig(config);
this.sideInputSystemConsumers =
new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager,
sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(),
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()),
JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, applicationConfig.getRunId());
}
}