in core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java [284:311]
public void stop() {
LOG.info("Stopping CamelSourceTask connector task");
try {
if (consumer != null) {
consumer.stop();
} else {
LOG.warn("A critical error may have occurred and there is no consumer to stop");
}
} catch (Exception e) {
LOG.error("Error stopping camel consumer: {}", e.getMessage());
}
try {
/*
* If the CamelMainSupport instance fails to be instantiated (ie.:
* due to missing classes or similar issues) then it won't be
* assigned and de-referencing it could cause an NPE.
*/
if (cms != null) {
cms.stop();
} else {
LOG.warn("A fatal exception may have occurred and the Camel main was not created");
}
} catch (Exception e) {
throw new ConnectException("Failed to stop Camel context", e);
} finally {
LOG.info("CamelSourceTask connector task stopped");
}
}