private void runWorker()

in frontend/server/src/main/java/com/amazonaws/ml/mms/wlm/WorkerThread.java [140:199]


    private void runWorker()
            throws WorkerInitializationException, InterruptedException, FileNotFoundException {
        int responseTimeout = model.getResponseTimeout();
        while (isRunning()) {
            req = aggregator.getRequest(backendChannel.id().asLongText(), state);
            backendChannel.writeAndFlush(req).sync();
            long begin = System.currentTimeMillis();
            // TODO: Change this to configurable param
            ModelWorkerResponse reply = replies.poll(responseTimeout, TimeUnit.MINUTES);
            long duration = System.currentTimeMillis() - begin;
            logger.info("Backend response time: {}", duration);

            if (reply != null) {
                aggregator.sendResponse(reply);
            } else {
                int val = model.incrFailedInfReqs();
                logger.error("Number or consecutive unsuccessful inference {}", val);
                throw new WorkerInitializationException(
                        "Backend worker did not respond in given time");
            }
            switch (req.getCommand()) {
                case PREDICT:
                    model.resetFailedInfReqs();
                    break;
                case LOAD:
                    String message = reply.getMessage();
                    String tmpdir = System.getProperty("java.io.tmpdir");
                    out =
                            new RandomAccessFile(
                                    tmpdir + '/' + backendChannel.id().asLongText() + "-stdout",
                                    "rw");
                    err =
                            new RandomAccessFile(
                                    tmpdir + '/' + backendChannel.id().asLongText() + "-stderr",
                                    "rw");
                    if (reply.getCode() == 200) {
                        setState(WorkerState.WORKER_MODEL_LOADED, HttpResponseStatus.OK);
                        lifeCycle.setPid(
                                Integer.parseInt(
                                        message.substring(
                                                message.indexOf("[PID]:") + 6, message.length())));
                        lifeCycle.attachIOStreams(
                                threadName,
                                Channels.newInputStream(out.getChannel()),
                                Channels.newInputStream(err.getChannel()));
                        backoffIdx = 0;
                    } else {
                        setState(
                                WorkerState.WORKER_ERROR,
                                HttpResponseStatus.valueOf(reply.getCode()));
                    }
                    break;
                case UNLOAD:
                case STATS:
                default:
                    break;
            }
            req = null;
        }
    }