pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java (198 lines of code) (raw):

package sample.cluster.client.grpc; import org.apache.pekko.NotUsed; import org.apache.pekko.actor.AbstractLoggingActor; import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.Props; import org.apache.pekko.actor.Terminated; import org.apache.pekko.event.LoggingAdapter; import org.apache.pekko.japi.pf.ReceiveBuilder; import org.apache.pekko.pattern.Patterns; import org.apache.pekko.stream.KillSwitches; import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.OverflowStrategy; import org.apache.pekko.stream.SharedKillSwitch; import org.apache.pekko.stream.WatchedActorTerminatedException; import org.apache.pekko.stream.javadsl.Source; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; /** * This actor is intended to be used on an external node that is not member * of the cluster. It acts like a gateway for sending messages to actors * somewhere in the cluster. With service discovery and Apache Pekko gRPC it will establish * a connection to a {@link ClusterClientReceptionist} somewhere in the cluster. * <p> * You can send messages via the `ClusterClient` to any actor in the cluster * that is registered in the {@link ClusterClientReceptionist}. * Messages are wrapped in {@link ClusterClient.Send}, {@link ClusterClient.SendToAll} * or {@link ClusterClient.Publish}. * <p> * 1. {@link ClusterClient.Send} - * The message will be delivered to one recipient with a matching path, if any such * exists. If several entries match the path the message will be delivered * to one random destination. The sender of the message can specify that local * affinity is preferred, i.e. the message is sent to an actor in the same local actor * system as the used receptionist actor, if any such exists, otherwise random to any other * matching entry. * <p> * 2. {@link ClusterClient.SendToAll} - * The message will be delivered to all recipients with a matching path. * <p> * 3. {@link ClusterClient.Publish} - * The message will be delivered to all recipients Actors that have been registered as subscribers to * to the named topic. * <p> * Use the factory method {@link ClusterClient#props ClusterClient.props}) to create the * `org.apache.pekko.actor.Props` for the actor. * <p> * If the receptionist is not currently available, the client will buffer the messages * and then deliver them when the connection to the receptionist has been established. * The size of the buffer is configurable and it can be disabled by using a buffer size * of 0. When the buffer is full old messages will be dropped when new messages are sent * via the client. * <p> * Note that this is a best effort implementation: messages can always be lost due to the distributed * nature of the actors involved. */ public class ClusterClient extends AbstractLoggingActor { /** * Factory method for `ClusterClient` `org.apache.pekko.actor.Props`. */ public static Props props(ClusterClientSettings settings, Materializer materializer) { return Props.create(ClusterClient.class, () -> new ClusterClient(settings, materializer)); } public interface Command {} public static class Send implements Command { public final String path; public final Object msg; public final boolean localAffinity; public Send(String path, Object msg, boolean localAffinity) { this.path = path; this.msg = msg; this.localAffinity = localAffinity; } /** * Convenience constructor with `localAffinity` false */ public Send(String path, Object msg) { this(path, msg, false); } } /** * More efficient than `Send` for single request-reply interaction */ public static class SendAsk implements Command { public final String path; public final Object msg; public final boolean localAffinity; public SendAsk(String path, Object msg, boolean localAffinity) { this.path = path; this.msg = msg; this.localAffinity = localAffinity; } /** * Convenience constructor with `localAffinity` false */ public SendAsk(String path, Object msg) { this(path, msg, false); } } public static class SendToAll implements Command { public final String path; public final Object msg; public SendToAll(String path, Object msg) { this.path = path; this.msg = msg; } } public static class Publish implements Command { public final String topic; public final Object msg; public Publish(String topic, Object msg) { this.topic = topic; this.msg = msg; } } private static ClusterClientReceptionistServiceClient createClientStub(ClusterClientSettings settings, Materializer mat) { return ClusterClientReceptionistServiceClient.create(settings.grpcClientSettings, mat.system()); } private static CompletionStage<ActorRef> newSession( ClusterClientSettings settings, ClusterClientReceptionistServiceClient receptionistServiceClient, ActorRef sender, SharedKillSwitch killSwitch, LoggingAdapter log, ClusterClientSerialization serialization, Materializer mat) { CompletableFuture<ActorRef> sessionReqRefPromise = new CompletableFuture<>(); log.info("New session for {}", sender); receptionistServiceClient .newSession( Source .actorRef( // never complete from stream element elem -> Optional.empty(), // never fail from stream element elem -> Optional.empty(), settings.bufferSize, OverflowStrategy.dropNew() ) .via(killSwitch.flow()) .map(msg -> { if (msg instanceof Send) { Send send = (Send) msg; Payload payload = serialization.serializePayload(send.msg); return Req.newBuilder() .setSend(SendReq.newBuilder() .setPath(send.path) .setLocalAffinity(send.localAffinity) .setPayload(payload)) .build(); } else if (msg instanceof SendToAll) { SendToAll sendToAll = (SendToAll) msg; Payload payload = serialization.serializePayload(sendToAll.msg); return Req.newBuilder() .setSendToAll(SendToAllReq.newBuilder() .setPath(sendToAll.path) .setPayload(payload)) .build(); } else if (msg instanceof Publish) { Publish publish = (Publish) msg; Payload payload = serialization.serializePayload(publish.msg); return Req.newBuilder() .setPublish(PublishReq.newBuilder() .setTopic(publish.topic) .setPayload(payload)) .build(); } else { throw new IllegalArgumentException("Unknown message type: " + msg.getClass()); } } ) .mapMaterializedValue(sessionReqRef -> { sessionReqRefPromise.complete(sessionReqRef); return NotUsed.getInstance(); } )) .watch(sender) // end session when original sender terminates .recoverWithRetries(-1, WatchedActorTerminatedException.class, Source::empty) .map(rsp -> serialization.deserializePayload(rsp.getPayload()) ) .runForeach(msg -> sender.tell(msg, ActorRef.noSender()), mat) .whenComplete((result, exc) -> { if (exc == null) log.info("Session completed successfully for {}: {}", sender, result); else log.info("Session completed with failure for {}: {}", sender, exc); }); return sessionReqRefPromise; } private static CompletionStage<Object> askSend( ClusterClientReceptionistServiceClient receptionistServiceClient, SendAsk send, ClusterClientSerialization serialization) { Payload payload = serialization.serializePayload(send.msg); SendReq sendReq = SendReq.newBuilder() .setPath(send.path) .setLocalAffinity(send.localAffinity) .setPayload(payload) .build(); return receptionistServiceClient.askSend(sendReq).thenApply( rsp -> serialization.deserializePayload(rsp.getPayload()) ); } private final ClusterClientSettings settings; private final Materializer materializer; private final ClusterClientReceptionistServiceClient receptionistServiceClient; private final ClusterClientSerialization serialization = new ClusterClientSerialization(getContext().getSystem()); // Original sender -> stream Source.actorRef of the session private final Map<ActorRef, CompletionStage<ActorRef>> sessionRef = new HashMap<>(); private final SharedKillSwitch killSwitch = KillSwitches.shared(getSelf().path().name()); private ClusterClient(ClusterClientSettings settings, Materializer materializer) { this.settings = settings; this.materializer = materializer; this.receptionistServiceClient = createClientStub(settings, materializer); } @Override public void postStop() throws Exception { killSwitch.shutdown(); super.postStop(); } @Override public Receive createReceive() { return ReceiveBuilder.create() .match(SendAsk.class, this::onSendAsk) .match(Command.class, this::onCommand) .match(Terminated.class, this::onTerminated) .build(); } private void onSendAsk(SendAsk send) { Patterns.pipe( ClusterClient.askSend(receptionistServiceClient, send, serialization), getContext().getDispatcher()) .to(getSender()); } private void onCommand(Command cmd) { final ActorRef originalSender = getSender(); CompletionStage<ActorRef> session = sessionRef.get(originalSender); if (session == null) { session = newSession(settings, receptionistServiceClient, originalSender, killSwitch, log(), serialization, materializer); sessionRef.put(originalSender, session); } getContext().watch(originalSender); session.thenAccept(ref -> ref.tell(cmd, ActorRef.noSender())); } private void onTerminated(Terminated terminated) { sessionRef.remove(terminated.getActor()); } }