in pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.scala [25:73]
override def newSession(in: Source[Req, NotUsed]): Source[Rsp, NotUsed] = {
val sessionId = UUID.randomUUID().toString
log.info("New session [{}]", sessionId)
Source
.actorRef[Any](
bufferSize = settings.bufferSize,
overflowStrategy = OverflowStrategy.dropNew,
// never complete from stream element
completionMatcher = PartialFunction.empty,
// never fail from stream element
failureMatcher = PartialFunction.empty)
.map { rsp =>
val payload = serialization.serializePayload(rsp)
Rsp(Some(payload))
}
.mapMaterializedValue { sessionRspRef =>
in.runForeach { req =>
if (req.req.isSend) {
val sendReq = req.getSend
val msg = serialization.deserializePayload(sendReq.payload.get)
// using sessionRspRef as sender so that replies are emitted to the response stream back to the client
pubSubMediator.tell(
DistributedPubSubMediator
.Send(sendReq.path, msg, sendReq.localAffinity),
sessionRspRef)
} else if (req.req.isSendToAll) {
val sendToAllReq = req.getSendToAll
val msg =
serialization.deserializePayload(sendToAllReq.payload.get)
pubSubMediator.tell(
DistributedPubSubMediator.SendToAll(sendToAllReq.path, msg),
sessionRspRef)
} else if (req.req.isPublish) {
val publishReq = req.getPublish
val msg = serialization.deserializePayload(publishReq.payload.get)
pubSubMediator.tell(
DistributedPubSubMediator.Publish(publishReq.topic, msg),
sessionRspRef)
} else {
throw new IllegalArgumentException("Unknown request type")
}
}
.onComplete { result =>
log.info("Session [{}] completed: {}", sessionId, result)
}(mat.executionContext)
NotUsed
}
}