in pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java [139:213]
private static CompletionStage<ActorRef> newSession(
ClusterClientSettings settings,
ClusterClientReceptionistServiceClient receptionistServiceClient,
ActorRef sender,
SharedKillSwitch killSwitch,
LoggingAdapter log,
ClusterClientSerialization serialization,
Materializer mat) {
CompletableFuture<ActorRef> sessionReqRefPromise = new CompletableFuture<>();
log.info("New session for {}", sender);
receptionistServiceClient
.newSession(
Source
.actorRef(
// never complete from stream element
elem -> Optional.empty(),
// never fail from stream element
elem -> Optional.empty(),
settings.bufferSize,
OverflowStrategy.dropNew()
)
.via(killSwitch.flow())
.map(msg -> {
if (msg instanceof Send) {
Send send = (Send) msg;
Payload payload = serialization.serializePayload(send.msg);
return Req.newBuilder()
.setSend(SendReq.newBuilder()
.setPath(send.path)
.setLocalAffinity(send.localAffinity)
.setPayload(payload))
.build();
} else if (msg instanceof SendToAll) {
SendToAll sendToAll = (SendToAll) msg;
Payload payload = serialization.serializePayload(sendToAll.msg);
return Req.newBuilder()
.setSendToAll(SendToAllReq.newBuilder()
.setPath(sendToAll.path)
.setPayload(payload))
.build();
} else if (msg instanceof Publish) {
Publish publish = (Publish) msg;
Payload payload = serialization.serializePayload(publish.msg);
return Req.newBuilder()
.setPublish(PublishReq.newBuilder()
.setTopic(publish.topic)
.setPayload(payload))
.build();
} else {
throw new IllegalArgumentException("Unknown message type: " + msg.getClass());
}
}
)
.mapMaterializedValue(sessionReqRef -> {
sessionReqRefPromise.complete(sessionReqRef);
return NotUsed.getInstance();
}
))
.watch(sender) // end session when original sender terminates
.recoverWithRetries(-1, WatchedActorTerminatedException.class, Source::empty)
.map(rsp ->
serialization.deserializePayload(rsp.getPayload())
)
.runForeach(msg -> sender.tell(msg, ActorRef.noSender()), mat)
.whenComplete((result, exc) -> {
if (exc == null)
log.info("Session completed successfully for {}: {}", sender, result);
else
log.info("Session completed with failure for {}: {}", sender, exc);
});
return sessionReqRefPromise;
}