in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala [129:206]
def listening(data: Data)(implicit mat: Materializer): Behavior[Event] = Behaviors.setup { context =>
def childTerminated(terminatedCc: ActorRef[ClientConnection.Event]): Behavior[Event] =
data.clientConnections.find { case (_, (_, cc)) => cc == terminatedCc } match {
case Some((connectionId, (clientId, _))) =>
data.consumerPacketRouter ! RemotePacketRouter.UnregisterConnection(connectionId)
data.publisherPacketRouter ! RemotePacketRouter.UnregisterConnection(connectionId)
data.unpublisherPacketRouter ! RemotePacketRouter.UnregisterConnection(connectionId)
QueueOfferState.waitForQueueOfferCompleted(
data.terminations
.offer(ClientSessionTerminated(clientId)),
result => QueueOfferCompleted(connectionId, result.toEither),
listening(data.copy(clientConnections = data.clientConnections - connectionId)),
stash = Vector.empty)
case None =>
Behaviors.same
}
Behaviors
.receiveMessagePartial[Event] {
case ConnectReceivedFromRemote(connectionId, connect, local) =>
val clientConnectionName = ActorName.mkName(ClientConnectionNamePrefix + connect.clientId)
val clientConnection = context.child(clientConnectionName) match {
case None =>
context.spawn(
ClientConnection(connect,
local,
data.consumerPacketRouter,
data.producerPacketRouter,
data.publisherPacketRouter,
data.unpublisherPacketRouter,
data.settings),
clientConnectionName)
case Some(ref) =>
val cc = ref.unsafeUpcast[ClientConnection.Event]
cc ! ClientConnection.ConnectReceivedFromRemote(connect, local)
cc
}
context.watch(clientConnection)
data.consumerPacketRouter ! RemotePacketRouter.RegisterConnection(connectionId, connect.clientId)
data.publisherPacketRouter ! RemotePacketRouter.RegisterConnection(connectionId, connect.clientId)
data.unpublisherPacketRouter ! RemotePacketRouter.RegisterConnection(connectionId, connect.clientId)
val newConnection = (connectionId, (connect.clientId, clientConnection))
listening(
data.copy(
clientConnections = data.clientConnections
.filterNot { case (_, (clientId, _)) => clientId == connect.clientId } + newConnection))
case ConnAckReceivedLocally(connectionId, connAck, remote) =>
forward(connectionId, data.clientConnections, ClientConnection.ConnAckReceivedLocally(connAck, remote))
case SubscribeReceivedFromRemote(connectionId, subscribe, local) =>
forward(connectionId, data.clientConnections, ClientConnection.SubscribeReceivedFromRemote(subscribe, local))
case PublishReceivedFromRemote(connectionId, publish, local) =>
forward(connectionId, data.clientConnections, ClientConnection.PublishReceivedFromRemote(publish, local))
case PublishReceivedLocally(publish, publishData) =>
data.clientConnections.values.foreach {
case (_, cc) => cc ! ClientConnection.PublishReceivedLocally(publish, publishData)
}
Behaviors.same
case UnsubscribeReceivedFromRemote(connectionId, unsubscribe, local) =>
forward(connectionId,
data.clientConnections,
ClientConnection.UnsubscribeReceivedFromRemote(unsubscribe, local))
case PingReqReceivedFromRemote(connectionId, local) =>
forward(connectionId, data.clientConnections, ClientConnection.PingReqReceivedFromRemote(local))
case DisconnectReceivedFromRemote(connectionId, local) =>
forward(connectionId, data.clientConnections, ClientConnection.DisconnectReceivedFromRemote(local))
case ConnectionLost(connectionId) =>
forward(connectionId, data.clientConnections, ClientConnection.ConnectionLost)
}
.receiveSignal {
case (_, Terminated(ref)) =>
childTerminated(ref.unsafeUpcast[ClientConnection.Event])
case (_, ChildFailed(ref, failure)) if failure == ClientConnection.ClientConnectionFailed =>
childTerminated(ref.unsafeUpcast[ClientConnection.Event])
}
}