in modules/mqtt-ext/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java [651:698]
public void connect() {
Callable<Void> callable = retrier.wrap(new Callable<Void>() {
@Override public Void call() throws Exception {
// If we're already connected, return immediately.
if (client.isConnected())
return null;
if (stopped)
return null;
// Connect to broker.
if (connectOptions == null)
client.connect();
else
client.connect(connectOptions);
// Always use the multiple topics variant of the mqtt client; even if the user specified a single
// topic and/or QoS, the initialization code would have placed it inside the 1..n structures.
if (qualitiesOfService.isEmpty())
client.subscribe(topics.toArray(new String[0]));
else {
int[] qoses = new int[qualitiesOfService.size()];
for (int i = 0; i < qualitiesOfService.size(); i++)
qoses[i] = qualitiesOfService.get(i);
client.subscribe(topics.toArray(new String[0]), qoses);
}
if (log.isInfoEnabled())
log.info("MQTT Streamer (re-)connected and subscribed " + cachedLogValues);
return null;
}
});
Future<Void> result = exec.submit(callable);
if (blockUntilConnected) {
try {
result.get();
}
catch (Throwable e) {
throw new RuntimeException(e);
}
}
}