in samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala [748:870]
def hasStopped(): Boolean = status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED
def run {
try {
info("Starting container.")
if (containerListener != null) {
containerListener.beforeStart()
}
val startTime = System.nanoTime()
status = SamzaContainerStatus.STARTING
if (jobConfig.getJMXEnabled) {
jmxServer = new JmxServer()
}
applicationContainerContextOption.foreach(_.start)
startMetrics
startDiagnostics
startAdmins
startDrainMonitor
startOffsetManager
storeContainerLocality
// TODO HIGH pmaheshw SAMZA-2338: since store restore needs to trim changelog messages,
// need to start changelog producers before the stores, but stop them after stores.
startProducers
val taskCheckpoints = startStores
startTableManager
startDiskSpaceMonitor
startHostStatisticsMonitor
startTask(taskCheckpoints)
startConsumers
startSecurityManger
info("Entering run loop.")
status = SamzaContainerStatus.STARTED
if (containerListener != null) {
containerListener.afterStart()
}
metrics.containerStartupTime.set((System.nanoTime() - startTime)/1000000)
if (taskInstances.size > 0)
runLoop.run
else
standbyContainerShutdownLatch.await() // Standby containers do not spin runLoop, instead they wait on signal to invoke shutdown
} catch {
case e: InterruptedException =>
/*
* We don't want to categorize interrupts as failure since the only place the container thread gets interrupted within
* our code inside stream processor is during the following two scenarios
* 1. During a re-balance, if the container has not started or hasn't reported start status to StreamProcessor.
* Subsequently stream processor attempts to interrupt the container thread before proceeding to join the barrier
* to agree on the new work assignment.
* 2. During shutdown signals to stream processor (external or internal), the stream processor signals the container to
* shutdown and waits for `task.shutdown.ms` before forcefully shutting down the container executor service which in
* turn interrupts the container thread.
*
* In the both of these scenarios, the failure cause is either captured externally (timing out scenario) or internally
* (failed attempt to shut down the container). The act of interrupting the container thread is an explicit intent to shutdown
* the container since it is not capable of reacting to shutdown signals in all scenarios.
*
*/
if (status.equals(SamzaContainerStatus.STARTED)) {
warn("Received an interrupt in run loop.", e)
} else {
warn("Received an interrupt during initialization.", e)
}
case e: Throwable =>
if (status.equals(SamzaContainerStatus.STARTED)) {
error("Caught exception/error in run loop.", e)
} else {
error("Caught exception/error while initializing container.", e)
}
status = SamzaContainerStatus.FAILED
exceptionSeen = e
}
try {
info("Shutting down SamzaContainer.")
if (jmxServer != null) {
jmxServer.stop
}
shutdownConsumers
shutdownTask
shutdownDrainMonitor
shutdownTableManager
shutdownStores
shutdownDiskSpaceMonitor
shutdownHostStatisticsMonitor
shutdownProducers
shutdownOffsetManager
shutdownDiagnostics
shutdownMetrics
shutdownSecurityManger
shutdownAdmins
applicationContainerContextOption.foreach(_.stop)
if (!status.equals(SamzaContainerStatus.FAILED)) {
status = SamzaContainerStatus.STOPPED
}
info("Shutdown complete.")
} catch {
case e: Throwable =>
error("Caught exception/error while shutting down container.", e)
if (exceptionSeen == null) {
exceptionSeen = e
}
status = SamzaContainerStatus.FAILED
}
status match {
case SamzaContainerStatus.STOPPED =>
if (containerListener != null) {
containerListener.afterStop()
}
case SamzaContainerStatus.FAILED =>
if (containerListener != null) {
containerListener.afterFailure(exceptionSeen)
}
}
}