public void handleReadIndex()

in mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java [107:150]


    public void handleReadIndex(final MqttRaftServer server, final String group, final RpcContext rpcCtx, ReadRequest request) {
        try {
            final Node node = server.getNode(group);
            if (Objects.isNull(node)) {
                rpcCtx.sendResponse(Response.newBuilder().setSuccess(false)
                        .setErrMsg("Could not find the corresponding Raft Group : " + group).build());
                return;
            }

            final StateProcessor processor = server.getProcessor(request.getCategory());
            if (Objects.isNull(processor)) {
                rpcCtx.sendResponse(Response.newBuilder().setSuccess(false)
                        .setErrMsg("Could not find the StateProcessor: " + group).build());
                return;
            }
            try {
                node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
                    @Override
                    public void run(Status status, long index, byte[] reqCtx) {
                        if (status.isOk()) {
                            try {
                                Response response = processor.onReadRequest(request);
                                rpcCtx.sendResponse(response);
                            } catch (Throwable t) {
                                LOGGER.info("process read request in handleReadIndex error : {}", t.toString());
                                rpcCtx.sendResponse(Response.newBuilder().setErrMsg(t.toString()).setSuccess(false).build());
                            }
                            return;
                        }
                        LOGGER.error("ReadIndex has error : {}, go to Leader read.", status.getErrorMsg());
                        readFromLeader(server, group, rpcCtx, request);
                    }
                });
            } catch (Throwable e) {
                LOGGER.error("ReadIndex has error : {}, go to Leader read.", e.toString());
                // run raft read
                readFromLeader(server, group, rpcCtx, request);
            }

        } catch (Throwable e) {
            LOGGER.error("handleReadIndex has error : ", e);
            rpcCtx.sendResponse(Response.newBuilder().setSuccess(false).setErrMsg(e.toString()).build());
        }
    }