public void onApply()

in mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java [52:105]


    public void onApply(Iterator iterator) {
        int index = 0;
        int applied = 0;
        Message message;
        MqttClosure closure = null;
        try {
            while (iterator.hasNext()) {
                Status status = Status.OK();
                try {
                    if (iterator.done() != null) {
                        closure = (MqttClosure) iterator.done();
                        message = closure.getMessage();
                    } else {
                        final ByteBuffer data = iterator.getData();
                        message = parseMessage(data.array());
                    }

                    LOGGER.debug("get message:{} and apply to state machine", message);

                    if (message instanceof WriteRequest) {
                        WriteRequest writeRequest = (WriteRequest) message;
                        StateProcessor processor = server.getProcessor(writeRequest.getCategory());
                        Response response = processor.onWriteRequest((WriteRequest) message);
                        if (Objects.nonNull(closure)) {
                            closure.setResponse(response);
                        }
                    }

                    if (message instanceof ReadRequest) {
                        ReadRequest request = (ReadRequest) message;
                        StateProcessor processor = server.getProcessor(request.getCategory());
                        Response response = processor.onReadRequest((ReadRequest) message);
                        if (Objects.nonNull(closure)) {
                            closure.setResponse(response);
                        }
                    }
                } catch (Throwable e) {
                    index++;
                    status.setError(RaftError.UNKNOWN, e.toString());
                    Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e));
                    throw e;
                } finally {
                    Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status));
                }

                applied++;
                index++;
                iterator.next();
            }
        } catch (Throwable t) {
            LOGGER.error("stateMachine meet critical error", t);
            //iterator.setErrorAndRollback(index - applied, new Status(RaftError.ESTATEMACHINE, "StateMachine meet critical error: %s.", t.toString()));
        }
    }