public Source runServer()

in benchmark-java/src/main/java/org/apache/pekko/grpc/benchmarks/driver/LoadWorker.java [152:188]


    public Source<Control.ServerStatus, NotUsed> runServer(Source<ServerArgs, NotUsed> in) {
      return in.map(value -> {
        try {
          ArgtypeCase argTypeCase = value.getArgtypeCase();
          if (argTypeCase == ServerArgs.ArgtypeCase.SETUP && workerServer == null) {
            if (serverPort != 0 && value.getSetup().getPort() == 0) {
              Control.ServerArgs.Builder builder = value.toBuilder();
              builder.getSetupBuilder().setPort(serverPort);
              value = builder.build();
            }
            workerServer = new LoadServer(value.getSetup());
            workerServer.start();
            return Control.ServerStatus.newBuilder()
                .setPort(workerServer.getPort())
                .setCores(workerServer.getCores())
                .build();
          } else if (argTypeCase == ArgtypeCase.MARK && workerServer != null) {
            return Control.ServerStatus.newBuilder()
                .setStats(workerServer.getStats())
                .build();
          } else {
            throw Status.ALREADY_EXISTS
                .withDescription("Server already started")
                .asRuntimeException();
          }
        } catch (Throwable t) {
          log.log(Level.WARNING, "Error running server", t);
          if (!(t instanceof StatusRuntimeException) && !(t instanceof StatusException))
            throw Status.INTERNAL.withCause(t).asException();
          else
            throw t;
          // FIXME: Shutdown server if we can
        }
      });

      // FIXME should also shutdown workerServer if client (upstream of in) completes, see onComplete in original code
    }