in samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java [197:297]
public void start() {
if (this.hasSideInputs) {
LOG.info("SideInput Restore started");
// initialize the sideInputStorageManagers
this.sspSideInputHandlers.values().forEach(TaskSideInputHandler::init);
Map<TaskName, TaskSideInputHandler> taskSideInputHandlers = this.sspSideInputHandlers.values().stream()
.distinct()
.collect(Collectors.toMap(TaskSideInputHandler::getTaskName, Function.identity()));
Map<TaskName, TaskInstanceMetrics> sideInputTaskMetrics = new HashMap<>();
Map<TaskName, RunLoopTask> sideInputTasks = new HashMap<>();
this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> {
Set<SystemStreamPartition> taskSSPs = this.taskSideInputStoreSSPs.get(taskName).values().stream()
.flatMap(Set::stream)
.collect(Collectors.toSet());
if (!taskSSPs.isEmpty()) {
String sideInputSource = SIDE_INPUTS_METRICS_PREFIX + this.taskInstanceMetrics.get(taskName).source();
TaskInstanceMetrics sideInputMetrics = new TaskInstanceMetrics(
sideInputSource, this.taskInstanceMetrics.get(taskName).registry(), SIDE_INPUTS_METRICS_PREFIX);
sideInputTaskMetrics.put(taskName, sideInputMetrics);
RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs,
taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName),
new TaskConfig(config).getCommitMs());
sideInputTasks.put(taskName, sideInputTask);
}
});
// register all sideInput SSPs with the consumers
for (SystemStreamPartition ssp : this.sspSideInputHandlers.keySet()) {
String startingOffset = this.sspSideInputHandlers.get(ssp).getStartingOffset(ssp);
if (startingOffset == null) {
throw new SamzaException(
"No starting offset could be obtained for SideInput SystemStreamPartition : " + ssp + ". Consumer cannot start.");
}
// register startingOffset with the sysConsumer and register a metric for it
sideInputSystemConsumers.register(ssp, startingOffset);
taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
}
// start the systemConsumers for consuming input
this.sideInputSystemConsumers.start();
TaskConfig taskConfig = new TaskConfig(this.config);
SamzaContainerMetrics sideInputContainerMetrics =
new SamzaContainerMetrics(SIDE_INPUTS_METRICS_PREFIX + this.samzaContainerMetrics.source(),
this.samzaContainerMetrics.registry(), SIDE_INPUTS_METRICS_PREFIX);
final ApplicationConfig applicationConfig = new ApplicationConfig(config);
SideInputRunLoopConfig runLoopConfig = new SideInputRunLoopConfig(config);
this.sideInputRunLoop = new RunLoop(sideInputTasks,
null, // all operations are executed in the main runloop thread
this.sideInputSystemConsumers,
sideInputContainerMetrics,
System::nanoTime,
runLoopConfig);
try {
sideInputsExecutor.submit(() -> {
try {
sideInputRunLoop.run();
} catch (Exception e) {
LOG.error("Exception in reading sideInputs", e);
sideInputException = e;
}
});
// Make the main thread wait until all sideInputs have been caughtup or an exception was thrown
while (!shouldShutdown && sideInputException == null &&
!awaitSideInputTasks(sideInputTaskLatches)) {
LOG.debug("Waiting for SideInput bootstrap to complete");
}
if (sideInputException != null) { // Throw exception if there was an exception in catching-up sideInputs
throw new SamzaException("Exception in restoring sideInputs", sideInputException);
}
} catch (InterruptedException e) {
LOG.warn("Received an interrupt during side inputs store restoration."
+ " Exiting prematurely without completing store restore.");
/*
* We want to stop side input restoration and rethrow the exception upstream. Container should handle the
* interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the
* resources prematurely here.
*/
shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence?
throw new SamzaException("Side inputs read was interrupted", e);
}
LOG.info("SideInput Restore complete");
}
}