in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala [326:391]
def serverConnect(data: ConnectReceived)(implicit mat: Materializer): Behavior[Event] = Behaviors.withTimers {
val ReceiveConnAck = "receive-connack"
timer =>
if (!timer.isTimerActive(ReceiveConnAck))
timer.startSingleTimer(ReceiveConnAck,
ReceiveConnAckTimeout(data.connectionId),
data.settings.receiveConnAckTimeout)
Behaviors
.receivePartial[Event] {
case (context, connect @ ConnectReceivedLocally(connectionId, _, _, _))
if connectionId != data.connectionId =>
context.self ! connect
disconnect(context, data.remote, data)
case (_, event) if event.connectionId.nonEmpty && event.connectionId != data.connectionId =>
Behaviors.same
case (context, ConnAckReceivedFromRemote(_, connAck, local))
if connAck.returnCode.contains(ConnAckReturnCode.ConnectionAccepted) =>
local.success(ForwardConnAck(data.connectData))
timer.cancel(ReceiveConnAck)
BehaviorRunner.run(
serverConnected(
ConnAckReceived(
data.connectionId,
data.connect.connectFlags,
data.connect.keepAlive,
pendingPingResp = false,
data.remote,
Vector.empty,
data.activeConsumers,
data.activeProducers,
data.pendingLocalPublications,
data.pendingRemotePublications,
data.consumerPacketRouter,
data.producerPacketRouter,
data.subscriberPacketRouter,
data.unsubscriberPacketRouter,
data.settings)),
context,
data.stash.map(BehaviorRunner.StoredMessage.apply))
case (context, ConnAckReceivedFromRemote(_, _, local)) =>
local.success(ForwardConnAck(data.connectData))
timer.cancel(ReceiveConnAck)
disconnect(context, data.remote, data)
case (context, ReceiveConnAckTimeout(_)) =>
data.remote.fail(ConnectFailed)
timer.cancel(ReceiveConnAck)
disconnect(context, data.remote, data)
case (context, ConnectionLost(_)) =>
timer.cancel(ReceiveConnAck)
disconnect(context, data.remote, data)
case (_, e) =>
serverConnect(data.copy(stash = data.stash :+ e))
}
.receiveSignal {
case (_, _: Terminated) =>
Behaviors.same
case (_, PostStop) =>
data.remote.complete()
Behaviors.same
}
}