public void start()

in modules/mqtt-ext/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java [145:252]


    public void start() throws IgniteException {
        if (!stopped)
            throw new IgniteException("Attempted to start an already started MQTT Streamer");

        // For simplicity, if these are null initialize to empty lists.
        topics = topics == null ? new ArrayList<String>() : topics;

        qualitiesOfService = qualitiesOfService == null ? new ArrayList<Integer>() : qualitiesOfService;

        try {
            Map<String, Object> logValues = new HashMap<>();

            // Parameter validations.
            A.notNull(getStreamer(), "streamer");
            A.notNull(getIgnite(), "ignite");
            A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null),
                "tuple extractor missing");
            A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null,
                "cannot provide both single and multiple tuple extractor");
            A.notNullOrEmpty(brokerUrl, "broker URL");

            // If the client ID is empty, generate one.
            if (clientId == null || clientId.length() == 0)
                clientId = MqttClient.generateClientId();

            // If we have both a single topic and a list of topics (but the list of topic is not of
            // size 1 and == topic, as this would be a case of re-initialization), fail.
            if (topic != null && topic.length() > 0 && !topics.isEmpty() &&
                topics.size() != 1 && !topics.get(0).equals(topic))
                throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time.");

            // Same as above but for QoS.
            if (qualityOfService != null && !qualitiesOfService.isEmpty() && qualitiesOfService.size() != 1 &&
                !qualitiesOfService.get(0).equals(qualityOfService))
                throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time.");

            // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly.
            if (disconnectForcibly && disconnectQuiesceTimeout != null)
                A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly " +
                    "with quiesce");

            // If we have multiple topics.
            if (!topics.isEmpty()) {
                for (String t : topics)
                    A.notNullOrEmpty(t, "topic in list of topics");

                A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(),
                    "qualities of service must be either empty or have the same size as topics list");

                logValues.put("topics", topics);
            }
            else {
                // Just the single topic.
                topics.add(topic);

                if (qualityOfService != null)
                    qualitiesOfService.add(qualityOfService);

                logValues.put("topic", topic);
            }

            // Finish building log values.
            logValues.put("brokerUrl", brokerUrl);
            logValues.put("clientId", clientId);

            // Cache log values.
            cachedLogValues = "[" + Joiner.on(", ").withKeyValueSeparator("=").join(logValues) + "]";

            // Create logger.
            log = getIgnite().log();

            // Create the MQTT client.
            if (persistence == null)
                client = new MqttClient(brokerUrl, clientId);
            else
                client = new MqttClient(brokerUrl, clientId, persistence);

            // Set this as a callback.
            client.setCallback(this);

            // Set stopped to false, as the connection will start async.
            stopped = false;

            // Build retrier.
            Retryer<Void> retrier = RetryerBuilder.<Void>newBuilder()
                .retryIfResult(new Predicate<Void>() {
                    @Override public boolean apply(Void v) {
                        return !client.isConnected() && !stopped;
                    }
                })
                .retryIfException().retryIfRuntimeException()
                .withWaitStrategy(retryWaitStrategy)
                .withStopStrategy(retryStopStrategy)
                .build();

            // Create the connection retrier.
            connectionRetrier = new MqttConnectionRetrier(retrier);

            if (log.isInfoEnabled())
                log.info("Starting MQTT Streamer " + cachedLogValues);

            // Connect.
            connectionRetrier.connect();
        }
        catch (Exception e) {
            throw new IgniteException("Failed to initialize MQTT Streamer.", e);
        }
    }