def hasStopped()

in samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala [748:870]


  def hasStopped(): Boolean = status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED

  def run {
    try {
      info("Starting container.")

      if (containerListener != null) {
        containerListener.beforeStart()
      }

      val startTime = System.nanoTime()
      status = SamzaContainerStatus.STARTING
      if (jobConfig.getJMXEnabled) {
        jmxServer = new JmxServer()
      }
      applicationContainerContextOption.foreach(_.start)

      startMetrics
      startDiagnostics
      startAdmins
      startDrainMonitor
      startOffsetManager
      storeContainerLocality
      // TODO HIGH pmaheshw SAMZA-2338: since store restore needs to trim changelog messages,
      // need to start changelog producers before the stores, but stop them after stores.
      startProducers
      val taskCheckpoints = startStores
      startTableManager
      startDiskSpaceMonitor
      startHostStatisticsMonitor
      startTask(taskCheckpoints)
      startConsumers
      startSecurityManger

      info("Entering run loop.")
      status = SamzaContainerStatus.STARTED
      if (containerListener != null) {
        containerListener.afterStart()
      }
      metrics.containerStartupTime.set((System.nanoTime() - startTime)/1000000)
      if (taskInstances.size > 0)
        runLoop.run
      else
        standbyContainerShutdownLatch.await() // Standby containers do not spin runLoop, instead they wait on signal to invoke shutdown
    } catch {
      case e: InterruptedException =>
        /*
         * We don't want to categorize interrupts as failure since the only place the container thread gets interrupted within
         * our code inside stream processor is during the following two scenarios
         *    1. During a re-balance, if the container has not started or hasn't reported start status to StreamProcessor.
         *       Subsequently stream processor attempts to interrupt the container thread before proceeding to join the barrier
         *       to agree on the new work assignment.
         *    2. During shutdown signals to stream processor (external or internal), the stream processor signals the container to
         *       shutdown and waits for `task.shutdown.ms` before forcefully shutting down the container executor service which in
         *       turn interrupts the container thread.
         *
         * In the both of these scenarios, the failure cause is either captured externally (timing out scenario) or internally
         * (failed attempt to shut down the container). The act of interrupting the container thread is an explicit intent to shutdown
         * the container since it is not capable of reacting to shutdown signals in all scenarios.
         *
         */
        if (status.equals(SamzaContainerStatus.STARTED)) {
          warn("Received an interrupt in run loop.", e)
        } else {
          warn("Received an interrupt during initialization.", e)
        }
      case e: Throwable =>
        if (status.equals(SamzaContainerStatus.STARTED)) {
          error("Caught exception/error in run loop.", e)
        } else {
          error("Caught exception/error while initializing container.", e)
        }
        status = SamzaContainerStatus.FAILED
        exceptionSeen = e
    }

    try {
      info("Shutting down SamzaContainer.")
      if (jmxServer != null) {
        jmxServer.stop
      }

      shutdownConsumers
      shutdownTask
      shutdownDrainMonitor
      shutdownTableManager
      shutdownStores
      shutdownDiskSpaceMonitor
      shutdownHostStatisticsMonitor
      shutdownProducers
      shutdownOffsetManager
      shutdownDiagnostics
      shutdownMetrics
      shutdownSecurityManger
      shutdownAdmins

      applicationContainerContextOption.foreach(_.stop)

      if (!status.equals(SamzaContainerStatus.FAILED)) {
        status = SamzaContainerStatus.STOPPED
      }

      info("Shutdown complete.")
    } catch {
      case e: Throwable =>
        error("Caught exception/error while shutting down container.", e)
        if (exceptionSeen == null) {
          exceptionSeen = e
        }
        status = SamzaContainerStatus.FAILED
    }

    status match {
      case SamzaContainerStatus.STOPPED =>
        if (containerListener != null) {
          containerListener.afterStop()
        }
      case SamzaContainerStatus.FAILED =>
        if (containerListener != null) {
          containerListener.afterFailure(exceptionSeen)
        }
    }
  }