private static CompletionStage newSession()

in pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java [139:213]


  private static CompletionStage<ActorRef> newSession(
      ClusterClientSettings settings,
      ClusterClientReceptionistServiceClient receptionistServiceClient,
      ActorRef sender,
      SharedKillSwitch killSwitch,
      LoggingAdapter log,
      ClusterClientSerialization serialization,
      Materializer mat) {

    CompletableFuture<ActorRef> sessionReqRefPromise = new CompletableFuture<>();

    log.info("New session for {}", sender);
    receptionistServiceClient
      .newSession(
        Source
          .actorRef(
            // never complete from stream element
            elem -> Optional.empty(),
            // never fail from stream element
            elem -> Optional.empty(),
            settings.bufferSize,
            OverflowStrategy.dropNew()
            )
          .via(killSwitch.flow())
          .map(msg -> {
            if (msg instanceof Send) {
              Send send = (Send) msg;
              Payload payload = serialization.serializePayload(send.msg);
              return Req.newBuilder()
                .setSend(SendReq.newBuilder()
                  .setPath(send.path)
                  .setLocalAffinity(send.localAffinity)
                  .setPayload(payload))
                  .build();
            } else if (msg instanceof SendToAll) {
              SendToAll sendToAll = (SendToAll) msg;
              Payload payload = serialization.serializePayload(sendToAll.msg);
              return Req.newBuilder()
                .setSendToAll(SendToAllReq.newBuilder()
                  .setPath(sendToAll.path)
                  .setPayload(payload))
                  .build();
            } else if (msg instanceof Publish) {
              Publish publish = (Publish) msg;
              Payload payload = serialization.serializePayload(publish.msg);
              return Req.newBuilder()
                .setPublish(PublishReq.newBuilder()
                  .setTopic(publish.topic)
                  .setPayload(payload))
                  .build();
            } else {
              throw new IllegalArgumentException("Unknown message type: " + msg.getClass());
            }
            }
          )
          .mapMaterializedValue(sessionReqRef -> {
              sessionReqRefPromise.complete(sessionReqRef);
              return NotUsed.getInstance();
            }
          ))
      .watch(sender) // end session when original sender terminates
      .recoverWithRetries(-1, WatchedActorTerminatedException.class, Source::empty)
      .map(rsp ->
        serialization.deserializePayload(rsp.getPayload())
      )
      .runForeach(msg -> sender.tell(msg, ActorRef.noSender()), mat)
      .whenComplete((result, exc) -> {
        if (exc == null)
          log.info("Session completed successfully for {}: {}", sender, result);
        else
          log.info("Session completed with failure for {}: {}", sender, exc);
      });

    return sessionReqRefPromise;
  }