private JobCoordinatorListener createJobCoordinatorListener()

in samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java [475:561]


  private JobCoordinatorListener createJobCoordinatorListener() {
    return new JobCoordinatorListener() {

      @Override
      public void onJobModelExpired() {
        synchronized (lock) {
          if (state == State.STARTED || state == State.RUNNING) {
            state = State.IN_REBALANCE;
            LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId);
            boolean hasContainerShutdown = stopSamzaContainer();
            if (!hasContainerShutdown) {
              LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId);
              state = State.STOPPING;
              jobCoordinator.stop();
            } else {
              LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
            }
          } else if (state == State.IN_REBALANCE) {
            if (container != null) {
              boolean hasContainerShutdown = interruptContainerAndShutdownExecutorService();
              if (!hasContainerShutdown) {
                LOGGER.warn("Job model expire unsuccessful. Failed to interrupt container: {} safely. "
                    + "Stopping the stream processor: {}", container, processorId);
                state = State.STOPPING;
                jobCoordinator.stop();
              } else {
                containerExecutorService = createExecutorService();
              }
            } else {
              LOGGER.info("Ignoring Job model expired since a rebalance is already in progress");
            }
          } else {
            LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state,
                ImmutableList.of(State.RUNNING, State.STARTED, State.IN_REBALANCE));
          }
        }
      }

      @Override
      public void onNewJobModel(String processorId, JobModel jobModel) {
        synchronized (lock) {
          if (state == State.IN_REBALANCE) {
            containerShutdownLatch = new CountDownLatch(1);
            container = createSamzaContainer(processorId, jobModel);
            container.setContainerListener(new ContainerListener());
            LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
            containerExecutorService.submit(container);
          } else {
            LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE);
          }
        }
      }

      @Override
      public void onCoordinatorStop() {
        synchronized (lock) {
          LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId);
          boolean hasContainerShutdown = stopSamzaContainer();

          // we only want to interrupt when container shutdown times out.
          if (!hasContainerShutdown) {
            containerExecutorService.shutdownNow();
          }
          state = State.STOPPED;
        }
        if (containerException != null)
          processorListener.afterFailure(containerException);
        else
          processorListener.afterStop();
      }

      @Override
      public void onCoordinatorFailure(Throwable throwable) {
        synchronized (lock) {
          LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable);
          boolean hasContainerShutdown = stopSamzaContainer();

          // we only want to interrupt when container shutdown times out.
          if (!hasContainerShutdown) {
            containerExecutorService.shutdownNow();
          }
          state = State.STOPPED;
        }
        processorListener.afterFailure(throwable);
      }
    };
  }