public void start()

in samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java [197:297]


  public void start() {
    if (this.hasSideInputs) {
      LOG.info("SideInput Restore started");

      // initialize the sideInputStorageManagers
      this.sspSideInputHandlers.values().forEach(TaskSideInputHandler::init);

      Map<TaskName, TaskSideInputHandler> taskSideInputHandlers = this.sspSideInputHandlers.values().stream()
          .distinct()
          .collect(Collectors.toMap(TaskSideInputHandler::getTaskName, Function.identity()));

      Map<TaskName, TaskInstanceMetrics> sideInputTaskMetrics = new HashMap<>();
      Map<TaskName, RunLoopTask> sideInputTasks = new HashMap<>();
      this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> {
        Set<SystemStreamPartition> taskSSPs = this.taskSideInputStoreSSPs.get(taskName).values().stream()
            .flatMap(Set::stream)
            .collect(Collectors.toSet());

        if (!taskSSPs.isEmpty()) {
          String sideInputSource = SIDE_INPUTS_METRICS_PREFIX + this.taskInstanceMetrics.get(taskName).source();
          TaskInstanceMetrics sideInputMetrics = new TaskInstanceMetrics(
              sideInputSource, this.taskInstanceMetrics.get(taskName).registry(), SIDE_INPUTS_METRICS_PREFIX);
          sideInputTaskMetrics.put(taskName, sideInputMetrics);

          RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs,
              taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName),
              new TaskConfig(config).getCommitMs());
          sideInputTasks.put(taskName, sideInputTask);
        }
      });

      // register all sideInput SSPs with the consumers
      for (SystemStreamPartition ssp : this.sspSideInputHandlers.keySet()) {
        String startingOffset = this.sspSideInputHandlers.get(ssp).getStartingOffset(ssp);

        if (startingOffset == null) {
          throw new SamzaException(
              "No starting offset could be obtained for SideInput SystemStreamPartition : " + ssp + ". Consumer cannot start.");
        }

        // register startingOffset with the sysConsumer and register a metric for it
        sideInputSystemConsumers.register(ssp, startingOffset);
        taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
            ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
        sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
            ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
      }

      // start the systemConsumers for consuming input
      this.sideInputSystemConsumers.start();

      TaskConfig taskConfig = new TaskConfig(this.config);
      SamzaContainerMetrics sideInputContainerMetrics =
          new SamzaContainerMetrics(SIDE_INPUTS_METRICS_PREFIX + this.samzaContainerMetrics.source(),
              this.samzaContainerMetrics.registry(), SIDE_INPUTS_METRICS_PREFIX);

      final ApplicationConfig applicationConfig = new ApplicationConfig(config);

      SideInputRunLoopConfig runLoopConfig = new SideInputRunLoopConfig(config);
      this.sideInputRunLoop = new RunLoop(sideInputTasks,
          null, // all operations are executed in the main runloop thread
          this.sideInputSystemConsumers,
          sideInputContainerMetrics,
          System::nanoTime,
          runLoopConfig);

      try {
        sideInputsExecutor.submit(() -> {
          try {
            sideInputRunLoop.run();
          } catch (Exception e) {
            LOG.error("Exception in reading sideInputs", e);
            sideInputException = e;
          }
        });

        // Make the main thread wait until all sideInputs have been caughtup or an exception was thrown
        while (!shouldShutdown && sideInputException == null &&
            !awaitSideInputTasks(sideInputTaskLatches)) {
          LOG.debug("Waiting for SideInput bootstrap to complete");
        }

        if (sideInputException != null) { // Throw exception if there was an exception in catching-up sideInputs
          throw new SamzaException("Exception in restoring sideInputs", sideInputException);
        }

      } catch (InterruptedException e) {
        LOG.warn("Received an interrupt during side inputs store restoration."
            + " Exiting prematurely without completing store restore.");
        /*
         * We want to stop side input restoration and rethrow the exception upstream. Container should handle the
         * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the
         * resources prematurely here.
         */
        shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence?
        throw new SamzaException("Side inputs read was interrupted", e);
      }

      LOG.info("SideInput Restore complete");
    }
  }