public void cleanShutdown()

in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/WorkerInstance.java [223:276]


  public void cleanShutdown(boolean force) {
    if (force || isShuttingDown.compareAndSet(false, true)) {

      if (producerManager != null) {
        LOGGER.info("Shutdown producer manager");
        producerManager.cleanShutdown();
      }

      LOGGER.info("Start clean shutdown");
      if (observer != null) {
        try {
          LOGGER.info("Shutdown observer");

          observer.shutdown();
        } catch (Exception e) {
          LOGGER.error("Failed to shut down observer", e);
        } finally {
          observer = null;
        }
      }

      if (fetcherManager != null) {
        try {
          LOGGER.info("Shutdown Consumer");
          fetcherManager.shutdown();
        } catch (Exception e) {
          LOGGER.error("Failed to shut down consumer", e);
        }
      }

      for (ConsumerIterator iterator : consumerStream) {
        iterator.cleanCurrentChunk();
      }

      if (checkpointManager != null) {
        checkpointManager.shutdown();
        checkpointManager = null;
      }
      messageTransformer = null;

      messageQueue.clear();
      consumerStream.clear();
      removeMetrics();

      LOGGER.info("stopping metrics reporter");
      KafkaUReplicatorMetricsReporter.stop();

      LOGGER.info("Kafka uReplicator worker shutdown successfully");
      isRunning.set(false);
    } else {
      LOGGER.info("worker instance already shutdown");
      return;
    }
  }