public void connect()

in modules/mqtt-ext/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java [651:698]


        public void connect() {
            Callable<Void> callable = retrier.wrap(new Callable<Void>() {
                @Override public Void call() throws Exception {
                    // If we're already connected, return immediately.
                    if (client.isConnected())
                        return null;

                    if (stopped)
                        return null;

                    // Connect to broker.
                    if (connectOptions == null)
                        client.connect();
                    else
                        client.connect(connectOptions);

                    // Always use the multiple topics variant of the mqtt client; even if the user specified a single
                    // topic and/or QoS, the initialization code would have placed it inside the 1..n structures.
                    if (qualitiesOfService.isEmpty())
                        client.subscribe(topics.toArray(new String[0]));

                    else {
                        int[] qoses = new int[qualitiesOfService.size()];

                        for (int i = 0; i < qualitiesOfService.size(); i++)
                            qoses[i] = qualitiesOfService.get(i);

                        client.subscribe(topics.toArray(new String[0]), qoses);
                    }

                    if (log.isInfoEnabled())
                        log.info("MQTT Streamer (re-)connected and subscribed " + cachedLogValues);

                    return null;
                }
            });

            Future<Void> result = exec.submit(callable);

            if (blockUntilConnected) {
                try {
                    result.get();
                }
                catch (Throwable e) {
                    throw new RuntimeException(e);
                }
            }
        }