public Source newSession()

in pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java [33:83]


  public Source<Rsp, NotUsed> newSession(Source<Req, NotUsed> in) {
    final String sessionId = UUID.randomUUID().toString();
    log.info("New session [{}]", sessionId);
    return Source
      .actorRef(
        // never complete from stream element
        elem -> Optional.empty(),
        // never fail from stream element
        elem -> Optional.empty(),
        settings.bufferSize,
        OverflowStrategy.dropNew())
      .map( rsp -> {
        Payload payload = serialization.serializePayload(rsp);
        return Rsp.newBuilder().setPayload(payload).build();
      })
      .mapMaterializedValue( sessionRspRef -> {
        in.runForeach( req -> {
          if (req.getReqCase() == Req.ReqCase.SEND) {
            SendReq sendReq = req.getSend();
            Object msg = serialization.deserializePayload(sendReq.getPayload());
            // using sessionRspRef as sender so that replies are emitted to the response stream back to the client
            pubSubMediator.tell(
              new DistributedPubSubMediator.Send(sendReq.getPath(), msg, sendReq.getLocalAffinity()),
              sessionRspRef);
          } else if (req.getReqCase() == Req.ReqCase.SENDTOALL) {
            SendToAllReq sendToAllReq = req.getSendToAll();
            Object msg = serialization.deserializePayload(sendToAllReq.getPayload());
            pubSubMediator.tell(
              new DistributedPubSubMediator.SendToAll(sendToAllReq.getPath(), msg),
              sessionRspRef);
          } else if (req.getReqCase() == Req.ReqCase.PUBLISH) {
            PublishReq publishReq = req.getPublish();
            Object msg = serialization.deserializePayload(publishReq.getPayload());
            pubSubMediator.tell(
              new DistributedPubSubMediator.Publish(publishReq.getTopic(), msg),
              sessionRspRef);
          } else {
            throw new IllegalArgumentException("Unknown request type");
          }

        }, materializer)
          .whenComplete((result, exc) -> {
            if (exc == null)
              log.info("Session [{}] completed successfully: {}", sessionId, result);
            else
              log.info("Session [{}] completed with failure: {}", sessionId, exc);
          });

        return NotUsed.getInstance();
      });
  }