private def newSession()

in pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClient.scala [54:105]


  private def newSession(
      settings: ClusterClientSettings,
      receptionistServiceClient: ClusterClientReceptionistServiceClient,
      sender: ActorRef,
      killSwitch: SharedKillSwitch,
      log: LoggingAdapter,
      serialization: ClusterClientSerialization)(implicit mat: Materializer): Future[ActorRef] = {
    val sessionReqRefPromise = Promise[ActorRef]()
    log.info("New session for {}", sender)
    receptionistServiceClient
      .newSession(
        Source
          .actorRef[Any](
            bufferSize = settings.bufferSize,
            overflowStrategy = OverflowStrategy.dropNew,
            // never complete from stream element
            completionMatcher = PartialFunction.empty,
            // never fail from stream element
            failureMatcher = PartialFunction.empty)
          // .actorRef[Any](bufferSize = settings.bufferSize, overflowStrategy = OverflowStrategy.dropNew)
          .via(killSwitch.flow)
          .map {
            case send: Send =>
              val payload = serialization.serializePayload(send.msg)
              Req().withSend(
                SendReq(send.path, send.localAffinity, Some(payload)))
            case sendToAll: SendToAll =>
              val payload = serialization.serializePayload(sendToAll.msg)
              Req().withSendToAll(SendToAllReq(sendToAll.path, Some(payload)))
            case publish: Publish =>
              val payload = serialization.serializePayload(publish.msg)
              Req().withPublish(PublishReq(publish.topic, Some(payload)))
          }
          .mapMaterializedValue(sessionReqRef => {
            sessionReqRefPromise.success(sessionReqRef)
            NotUsed
          }))
      .watch(sender) // end session when original sender terminates
      .recoverWithRetries(-1,
        {
          case _: WatchedActorTerminatedException => Source.empty
        })
      .map { rsp =>
        serialization.deserializePayload(rsp.payload.get)
      }
      .runForeach(sender ! _)
      .onComplete { result =>
        log.info("Session completed for {} with {}", sender, result)
      }(mat.executionContext)

    sessionReqRefPromise.future
  }