in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/WorkerInstance.java [223:276]
public void cleanShutdown(boolean force) {
if (force || isShuttingDown.compareAndSet(false, true)) {
if (producerManager != null) {
LOGGER.info("Shutdown producer manager");
producerManager.cleanShutdown();
}
LOGGER.info("Start clean shutdown");
if (observer != null) {
try {
LOGGER.info("Shutdown observer");
observer.shutdown();
} catch (Exception e) {
LOGGER.error("Failed to shut down observer", e);
} finally {
observer = null;
}
}
if (fetcherManager != null) {
try {
LOGGER.info("Shutdown Consumer");
fetcherManager.shutdown();
} catch (Exception e) {
LOGGER.error("Failed to shut down consumer", e);
}
}
for (ConsumerIterator iterator : consumerStream) {
iterator.cleanCurrentChunk();
}
if (checkpointManager != null) {
checkpointManager.shutdown();
checkpointManager = null;
}
messageTransformer = null;
messageQueue.clear();
consumerStream.clear();
removeMetrics();
LOGGER.info("stopping metrics reporter");
KafkaUReplicatorMetricsReporter.stop();
LOGGER.info("Kafka uReplicator worker shutdown successfully");
isRunning.set(false);
} else {
LOGGER.info("worker instance already shutdown");
return;
}
}