in modules/mqtt-ext/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java [145:252]
public void start() throws IgniteException {
if (!stopped)
throw new IgniteException("Attempted to start an already started MQTT Streamer");
// For simplicity, if these are null initialize to empty lists.
topics = topics == null ? new ArrayList<String>() : topics;
qualitiesOfService = qualitiesOfService == null ? new ArrayList<Integer>() : qualitiesOfService;
try {
Map<String, Object> logValues = new HashMap<>();
// Parameter validations.
A.notNull(getStreamer(), "streamer");
A.notNull(getIgnite(), "ignite");
A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null),
"tuple extractor missing");
A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null,
"cannot provide both single and multiple tuple extractor");
A.notNullOrEmpty(brokerUrl, "broker URL");
// If the client ID is empty, generate one.
if (clientId == null || clientId.length() == 0)
clientId = MqttClient.generateClientId();
// If we have both a single topic and a list of topics (but the list of topic is not of
// size 1 and == topic, as this would be a case of re-initialization), fail.
if (topic != null && topic.length() > 0 && !topics.isEmpty() &&
topics.size() != 1 && !topics.get(0).equals(topic))
throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time.");
// Same as above but for QoS.
if (qualityOfService != null && !qualitiesOfService.isEmpty() && qualitiesOfService.size() != 1 &&
!qualitiesOfService.get(0).equals(qualityOfService))
throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time.");
// Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly.
if (disconnectForcibly && disconnectQuiesceTimeout != null)
A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly " +
"with quiesce");
// If we have multiple topics.
if (!topics.isEmpty()) {
for (String t : topics)
A.notNullOrEmpty(t, "topic in list of topics");
A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(),
"qualities of service must be either empty or have the same size as topics list");
logValues.put("topics", topics);
}
else {
// Just the single topic.
topics.add(topic);
if (qualityOfService != null)
qualitiesOfService.add(qualityOfService);
logValues.put("topic", topic);
}
// Finish building log values.
logValues.put("brokerUrl", brokerUrl);
logValues.put("clientId", clientId);
// Cache log values.
cachedLogValues = "[" + Joiner.on(", ").withKeyValueSeparator("=").join(logValues) + "]";
// Create logger.
log = getIgnite().log();
// Create the MQTT client.
if (persistence == null)
client = new MqttClient(brokerUrl, clientId);
else
client = new MqttClient(brokerUrl, clientId, persistence);
// Set this as a callback.
client.setCallback(this);
// Set stopped to false, as the connection will start async.
stopped = false;
// Build retrier.
Retryer<Void> retrier = RetryerBuilder.<Void>newBuilder()
.retryIfResult(new Predicate<Void>() {
@Override public boolean apply(Void v) {
return !client.isConnected() && !stopped;
}
})
.retryIfException().retryIfRuntimeException()
.withWaitStrategy(retryWaitStrategy)
.withStopStrategy(retryStopStrategy)
.build();
// Create the connection retrier.
connectionRetrier = new MqttConnectionRetrier(retrier);
if (log.isInfoEnabled())
log.info("Starting MQTT Streamer " + cachedLogValues);
// Connect.
connectionRetrier.connect();
}
catch (Exception e) {
throw new IgniteException("Failed to initialize MQTT Streamer.", e);
}
}