in benchmark-java/src/main/java/org/apache/pekko/grpc/benchmarks/driver/LoadWorker.java [152:188]
public Source<Control.ServerStatus, NotUsed> runServer(Source<ServerArgs, NotUsed> in) {
return in.map(value -> {
try {
ArgtypeCase argTypeCase = value.getArgtypeCase();
if (argTypeCase == ServerArgs.ArgtypeCase.SETUP && workerServer == null) {
if (serverPort != 0 && value.getSetup().getPort() == 0) {
Control.ServerArgs.Builder builder = value.toBuilder();
builder.getSetupBuilder().setPort(serverPort);
value = builder.build();
}
workerServer = new LoadServer(value.getSetup());
workerServer.start();
return Control.ServerStatus.newBuilder()
.setPort(workerServer.getPort())
.setCores(workerServer.getCores())
.build();
} else if (argTypeCase == ArgtypeCase.MARK && workerServer != null) {
return Control.ServerStatus.newBuilder()
.setStats(workerServer.getStats())
.build();
} else {
throw Status.ALREADY_EXISTS
.withDescription("Server already started")
.asRuntimeException();
}
} catch (Throwable t) {
log.log(Level.WARNING, "Error running server", t);
if (!(t instanceof StatusRuntimeException) && !(t instanceof StatusException))
throw Status.INTERNAL.withCause(t).asException();
else
throw t;
// FIXME: Shutdown server if we can
}
});
// FIXME should also shutdown workerServer if client (upstream of in) completes, see onComplete in original code
}