in pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClient.scala [54:105]
private def newSession(
settings: ClusterClientSettings,
receptionistServiceClient: ClusterClientReceptionistServiceClient,
sender: ActorRef,
killSwitch: SharedKillSwitch,
log: LoggingAdapter,
serialization: ClusterClientSerialization)(implicit mat: Materializer): Future[ActorRef] = {
val sessionReqRefPromise = Promise[ActorRef]()
log.info("New session for {}", sender)
receptionistServiceClient
.newSession(
Source
.actorRef[Any](
bufferSize = settings.bufferSize,
overflowStrategy = OverflowStrategy.dropNew,
// never complete from stream element
completionMatcher = PartialFunction.empty,
// never fail from stream element
failureMatcher = PartialFunction.empty)
// .actorRef[Any](bufferSize = settings.bufferSize, overflowStrategy = OverflowStrategy.dropNew)
.via(killSwitch.flow)
.map {
case send: Send =>
val payload = serialization.serializePayload(send.msg)
Req().withSend(
SendReq(send.path, send.localAffinity, Some(payload)))
case sendToAll: SendToAll =>
val payload = serialization.serializePayload(sendToAll.msg)
Req().withSendToAll(SendToAllReq(sendToAll.path, Some(payload)))
case publish: Publish =>
val payload = serialization.serializePayload(publish.msg)
Req().withPublish(PublishReq(publish.topic, Some(payload)))
}
.mapMaterializedValue(sessionReqRef => {
sessionReqRefPromise.success(sessionReqRef)
NotUsed
}))
.watch(sender) // end session when original sender terminates
.recoverWithRetries(-1,
{
case _: WatchedActorTerminatedException => Source.empty
})
.map { rsp =>
serialization.deserializePayload(rsp.payload.get)
}
.runForeach(sender ! _)
.onComplete { result =>
log.info("Session completed for {} with {}", sender, result)
}(mat.executionContext)
sessionReqRefPromise.future
}