def serverConnect()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala [326:391]


  def serverConnect(data: ConnectReceived)(implicit mat: Materializer): Behavior[Event] = Behaviors.withTimers {
    val ReceiveConnAck = "receive-connack"

    timer =>
      if (!timer.isTimerActive(ReceiveConnAck))
        timer.startSingleTimer(ReceiveConnAck,
          ReceiveConnAckTimeout(data.connectionId),
          data.settings.receiveConnAckTimeout)
      Behaviors
        .receivePartial[Event] {
          case (context, connect @ ConnectReceivedLocally(connectionId, _, _, _))
              if connectionId != data.connectionId =>
            context.self ! connect
            disconnect(context, data.remote, data)
          case (_, event) if event.connectionId.nonEmpty && event.connectionId != data.connectionId =>
            Behaviors.same
          case (context, ConnAckReceivedFromRemote(_, connAck, local))
              if connAck.returnCode.contains(ConnAckReturnCode.ConnectionAccepted) =>
            local.success(ForwardConnAck(data.connectData))

            timer.cancel(ReceiveConnAck)

            BehaviorRunner.run(
              serverConnected(
                ConnAckReceived(
                  data.connectionId,
                  data.connect.connectFlags,
                  data.connect.keepAlive,
                  pendingPingResp = false,
                  data.remote,
                  Vector.empty,
                  data.activeConsumers,
                  data.activeProducers,
                  data.pendingLocalPublications,
                  data.pendingRemotePublications,
                  data.consumerPacketRouter,
                  data.producerPacketRouter,
                  data.subscriberPacketRouter,
                  data.unsubscriberPacketRouter,
                  data.settings)),
              context,
              data.stash.map(BehaviorRunner.StoredMessage.apply))

          case (context, ConnAckReceivedFromRemote(_, _, local)) =>
            local.success(ForwardConnAck(data.connectData))
            timer.cancel(ReceiveConnAck)
            disconnect(context, data.remote, data)
          case (context, ReceiveConnAckTimeout(_)) =>
            data.remote.fail(ConnectFailed)
            timer.cancel(ReceiveConnAck)
            disconnect(context, data.remote, data)
          case (context, ConnectionLost(_)) =>
            timer.cancel(ReceiveConnAck)
            disconnect(context, data.remote, data)
          case (_, e) =>
            serverConnect(data.copy(stash = data.stash :+ e))
        }
        .receiveSignal {
          case (_, _: Terminated) =>
            Behaviors.same
          case (_, PostStop) =>
            data.remote.complete()
            Behaviors.same
        }

  }