in mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala [163:238]
def onFailure(token: IMqttToken, ex: Throwable): Unit =
onSubscribe.invoke(Failure(ex))
}
)
} else {
log.debug(
"Client [{}] connected to broker [{}] without subscriptions",
connectionSettings.clientId,
connectionSettings.broker
)
subscriptionPromise.complete(SuccessfullyDone)
pull(in)
}
})
private val onConnectionLost: AsyncCallback[Throwable] = getAsyncCallback[Throwable](failStageWith)
private val onMessageAsyncCallback: AsyncCallback[MqttMessageWithAck] =
getAsyncCallback[MqttMessageWithAck] { message =>
if (isAvailable(out)) {
pushDownstream(message)
} else if (queue.size + 1 > bufferSize) {
failStageWith(new RuntimeException(s"Reached maximum buffer size [$bufferSize]"))
} else {
queue.enqueue(message)
}
}
private val onPublished: AsyncCallback[Try[IMqttToken]] = getAsyncCallback[Try[IMqttToken]] {
case Success(_) => if (!hasBeenPulled(in)) pull(in)
case Failure(ex) => failStageWith(ex)
}
private def createPahoBufferOptions(settings: MqttOfflinePersistenceSettings): DisconnectedBufferOptions = {
val disconnectedBufferOptions = new DisconnectedBufferOptions()
disconnectedBufferOptions.setBufferEnabled(true)
disconnectedBufferOptions.setBufferSize(settings.bufferSize)
disconnectedBufferOptions.setDeleteOldestMessages(settings.deleteOldestMessage)
disconnectedBufferOptions.setPersistBuffer(settings.persistBuffer)
disconnectedBufferOptions
}
private val client = new MqttAsyncClient(
connectionSettings.broker,
connectionSettings.clientId,
connectionSettings.persistence
)
private def mqttClient: MqttAsyncClient = connectionSettings.offlinePersistence match {
case Some(bufferOpts) =>
client.setBufferOpts(createPahoBufferOptions(bufferOpts))
client
case _ =>
client
}
private val commitCallback: AsyncCallback[CommitCallbackArguments] =
getAsyncCallback[CommitCallbackArguments]((args: CommitCallbackArguments) =>
try {
mqttClient.messageArrivedComplete(args.messageId, args.qos.value)
if (unackedMessages.decrementAndGet() == 0 && (isClosed(out) || (isClosed(in) && queue.isEmpty)))
completeStage()
args.promise.complete(SuccessfullyDone)
} catch {
case ex: Throwable => args.promise.failure(ex)
}
)
mqttClient.setCallback(
new MqttCallback {
override def messageArrived(topic: String, pahoMessage: PahoMqttMessage): Unit = {
backpressurePahoClient.acquire()
val message = new MqttMessageWithAck {