in samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java [475:561]
private JobCoordinatorListener createJobCoordinatorListener() {
return new JobCoordinatorListener() {
@Override
public void onJobModelExpired() {
synchronized (lock) {
if (state == State.STARTED || state == State.RUNNING) {
state = State.IN_REBALANCE;
LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId);
boolean hasContainerShutdown = stopSamzaContainer();
if (!hasContainerShutdown) {
LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId);
state = State.STOPPING;
jobCoordinator.stop();
} else {
LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
}
} else if (state == State.IN_REBALANCE) {
if (container != null) {
boolean hasContainerShutdown = interruptContainerAndShutdownExecutorService();
if (!hasContainerShutdown) {
LOGGER.warn("Job model expire unsuccessful. Failed to interrupt container: {} safely. "
+ "Stopping the stream processor: {}", container, processorId);
state = State.STOPPING;
jobCoordinator.stop();
} else {
containerExecutorService = createExecutorService();
}
} else {
LOGGER.info("Ignoring Job model expired since a rebalance is already in progress");
}
} else {
LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state,
ImmutableList.of(State.RUNNING, State.STARTED, State.IN_REBALANCE));
}
}
}
@Override
public void onNewJobModel(String processorId, JobModel jobModel) {
synchronized (lock) {
if (state == State.IN_REBALANCE) {
containerShutdownLatch = new CountDownLatch(1);
container = createSamzaContainer(processorId, jobModel);
container.setContainerListener(new ContainerListener());
LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
containerExecutorService.submit(container);
} else {
LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE);
}
}
}
@Override
public void onCoordinatorStop() {
synchronized (lock) {
LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId);
boolean hasContainerShutdown = stopSamzaContainer();
// we only want to interrupt when container shutdown times out.
if (!hasContainerShutdown) {
containerExecutorService.shutdownNow();
}
state = State.STOPPED;
}
if (containerException != null)
processorListener.afterFailure(containerException);
else
processorListener.afterStop();
}
@Override
public void onCoordinatorFailure(Throwable throwable) {
synchronized (lock) {
LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable);
boolean hasContainerShutdown = stopSamzaContainer();
// we only want to interrupt when container shutdown times out.
if (!hasContainerShutdown) {
containerExecutorService.shutdownNow();
}
state = State.STOPPED;
}
processorListener.afterFailure(throwable);
}
};
}