override def newSession()

in pekko-sample-cluster-client-grpc-scala/src/main/scala/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.scala [24:72]


  override def newSession(in: Source[Req, NotUsed]): Source[Rsp, NotUsed] = {
    val sessionId = UUID.randomUUID().toString
    log.info("New session [{}]", sessionId)
    Source
      .actorRef[Any](
        bufferSize = settings.bufferSize,
        overflowStrategy = OverflowStrategy.dropNew,
        // never complete from stream element
        completionMatcher = PartialFunction.empty,
        // never fail from stream element
        failureMatcher = PartialFunction.empty)
      .map { rsp =>
        val payload = serialization.serializePayload(rsp)
        Rsp(Some(payload))
      }
      .mapMaterializedValue { sessionRspRef =>
        in.runForeach { req =>
          if (req.req.isSend) {
            val sendReq = req.getSend
            val msg = serialization.deserializePayload(sendReq.payload.get)
            // using sessionRspRef as sender so that replies are emitted to the response stream back to the client
            pubSubMediator.tell(
              DistributedPubSubMediator
                .Send(sendReq.path, msg, sendReq.localAffinity),
              sessionRspRef)
          } else if (req.req.isSendToAll) {
            val sendToAllReq = req.getSendToAll
            val msg =
              serialization.deserializePayload(sendToAllReq.payload.get)
            pubSubMediator.tell(
              DistributedPubSubMediator.SendToAll(sendToAllReq.path, msg),
              sessionRspRef)
          } else if (req.req.isPublish) {
            val publishReq = req.getPublish
            val msg = serialization.deserializePayload(publishReq.payload.get)
            pubSubMediator.tell(
              DistributedPubSubMediator.Publish(publishReq.topic, msg),
              sessionRspRef)
          } else {
            throw new IllegalArgumentException("Unknown request type")
          }

        }
          .onComplete { result =>
            log.info("Session [{}] completed: {}", sessionId, result)
          }(mat.executionContext)
        NotUsed
      }
  }