in modules/mqtt-ext/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java [259:292]
public void stop() throws IgniteException {
if (stopped)
throw new IgniteException("Failed to stop MQTT Streamer (already stopped).");
// Stop the retrier.
connectionRetrier.stop();
try {
if (disconnectForcibly) {
if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null)
client.disconnectForcibly();
else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null)
client.disconnectForcibly(disconnectForciblyTimeout);
else
client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout);
}
else {
if (disconnectQuiesceTimeout == null)
client.disconnect();
else
client.disconnect(disconnectQuiesceTimeout);
}
client.close();
stopped = true;
}
catch (Exception e) {
throw new IgniteException("Failed to stop Exception while stopping MQTT Streamer.", e);
}
}