def listening()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala [129:206]


  def listening(data: Data)(implicit mat: Materializer): Behavior[Event] = Behaviors.setup { context =>
    def childTerminated(terminatedCc: ActorRef[ClientConnection.Event]): Behavior[Event] =
      data.clientConnections.find { case (_, (_, cc)) => cc == terminatedCc } match {
        case Some((connectionId, (clientId, _))) =>
          data.consumerPacketRouter ! RemotePacketRouter.UnregisterConnection(connectionId)
          data.publisherPacketRouter ! RemotePacketRouter.UnregisterConnection(connectionId)
          data.unpublisherPacketRouter ! RemotePacketRouter.UnregisterConnection(connectionId)

          QueueOfferState.waitForQueueOfferCompleted(
            data.terminations
              .offer(ClientSessionTerminated(clientId)),
            result => QueueOfferCompleted(connectionId, result.toEither),
            listening(data.copy(clientConnections = data.clientConnections - connectionId)),
            stash = Vector.empty)

        case None =>
          Behaviors.same
      }

    Behaviors
      .receiveMessagePartial[Event] {
        case ConnectReceivedFromRemote(connectionId, connect, local) =>
          val clientConnectionName = ActorName.mkName(ClientConnectionNamePrefix + connect.clientId)
          val clientConnection = context.child(clientConnectionName) match {
            case None =>
              context.spawn(
                ClientConnection(connect,
                  local,
                  data.consumerPacketRouter,
                  data.producerPacketRouter,
                  data.publisherPacketRouter,
                  data.unpublisherPacketRouter,
                  data.settings),
                clientConnectionName)

            case Some(ref) =>
              val cc = ref.unsafeUpcast[ClientConnection.Event]
              cc ! ClientConnection.ConnectReceivedFromRemote(connect, local)
              cc
          }
          context.watch(clientConnection)
          data.consumerPacketRouter ! RemotePacketRouter.RegisterConnection(connectionId, connect.clientId)
          data.publisherPacketRouter ! RemotePacketRouter.RegisterConnection(connectionId, connect.clientId)
          data.unpublisherPacketRouter ! RemotePacketRouter.RegisterConnection(connectionId, connect.clientId)
          val newConnection = (connectionId, (connect.clientId, clientConnection))
          listening(
            data.copy(
              clientConnections = data.clientConnections
                .filterNot { case (_, (clientId, _)) => clientId == connect.clientId } + newConnection))
        case ConnAckReceivedLocally(connectionId, connAck, remote) =>
          forward(connectionId, data.clientConnections, ClientConnection.ConnAckReceivedLocally(connAck, remote))
        case SubscribeReceivedFromRemote(connectionId, subscribe, local) =>
          forward(connectionId, data.clientConnections, ClientConnection.SubscribeReceivedFromRemote(subscribe, local))
        case PublishReceivedFromRemote(connectionId, publish, local) =>
          forward(connectionId, data.clientConnections, ClientConnection.PublishReceivedFromRemote(publish, local))
        case PublishReceivedLocally(publish, publishData) =>
          data.clientConnections.values.foreach {
            case (_, cc) => cc ! ClientConnection.PublishReceivedLocally(publish, publishData)
          }
          Behaviors.same
        case UnsubscribeReceivedFromRemote(connectionId, unsubscribe, local) =>
          forward(connectionId,
            data.clientConnections,
            ClientConnection.UnsubscribeReceivedFromRemote(unsubscribe, local))
        case PingReqReceivedFromRemote(connectionId, local) =>
          forward(connectionId, data.clientConnections, ClientConnection.PingReqReceivedFromRemote(local))
        case DisconnectReceivedFromRemote(connectionId, local) =>
          forward(connectionId, data.clientConnections, ClientConnection.DisconnectReceivedFromRemote(local))
        case ConnectionLost(connectionId) =>
          forward(connectionId, data.clientConnections, ClientConnection.ConnectionLost)
      }
      .receiveSignal {
        case (_, Terminated(ref)) =>
          childTerminated(ref.unsafeUpcast[ClientConnection.Event])
        case (_, ChildFailed(ref, failure)) if failure == ClientConnection.ClientConnectionFailed =>
          childTerminated(ref.unsafeUpcast[ClientConnection.Event])
      }
  }