def onFailure()

in mqttv5/src/main/scala/org/apache/pekko/stream/connectors/mqttv5/impl/MqttFlowStage.scala [163:238]


            def onFailure(token: IMqttToken, ex: Throwable): Unit =
              onSubscribe.invoke(Failure(ex))
          }
        )
      } else {
        log.debug(
          "Client [{}] connected to broker [{}] without subscriptions",
          connectionSettings.clientId,
          connectionSettings.broker
        )
        subscriptionPromise.complete(SuccessfullyDone)
        pull(in)
      }
    })

  private val onConnectionLost: AsyncCallback[Throwable] = getAsyncCallback[Throwable](failStageWith)

  private val onMessageAsyncCallback: AsyncCallback[MqttMessageWithAck] =
    getAsyncCallback[MqttMessageWithAck] { message =>
      if (isAvailable(out)) {
        pushDownstream(message)
      } else if (queue.size + 1 > bufferSize) {
        failStageWith(new RuntimeException(s"Reached maximum buffer size [$bufferSize]"))
      } else {
        queue.enqueue(message)
      }
    }

  private val onPublished: AsyncCallback[Try[IMqttToken]] = getAsyncCallback[Try[IMqttToken]] {
    case Success(_)  => if (!hasBeenPulled(in)) pull(in)
    case Failure(ex) => failStageWith(ex)
  }

  private def createPahoBufferOptions(settings: MqttOfflinePersistenceSettings): DisconnectedBufferOptions = {
    val disconnectedBufferOptions = new DisconnectedBufferOptions()

    disconnectedBufferOptions.setBufferEnabled(true)
    disconnectedBufferOptions.setBufferSize(settings.bufferSize)
    disconnectedBufferOptions.setDeleteOldestMessages(settings.deleteOldestMessage)
    disconnectedBufferOptions.setPersistBuffer(settings.persistBuffer)

    disconnectedBufferOptions
  }

  private val client = new MqttAsyncClient(
    connectionSettings.broker,
    connectionSettings.clientId,
    connectionSettings.persistence
  )

  private def mqttClient: MqttAsyncClient = connectionSettings.offlinePersistence match {
    case Some(bufferOpts) =>
      client.setBufferOpts(createPahoBufferOptions(bufferOpts))
      client

    case _ =>
      client
  }

  private val commitCallback: AsyncCallback[CommitCallbackArguments] =
    getAsyncCallback[CommitCallbackArguments]((args: CommitCallbackArguments) =>
      try {
        mqttClient.messageArrivedComplete(args.messageId, args.qos.value)
        if (unackedMessages.decrementAndGet() == 0 && (isClosed(out) || (isClosed(in) && queue.isEmpty)))
          completeStage()
        args.promise.complete(SuccessfullyDone)
      } catch {
        case ex: Throwable => args.promise.failure(ex)
      }
    )

  mqttClient.setCallback(
    new MqttCallback {
      override def messageArrived(topic: String, pahoMessage: PahoMqttMessage): Unit = {
        backpressurePahoClient.acquire()
        val message = new MqttMessageWithAck {