in mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/rpc/AbstractRpcProcessor.java [107:150]
public void handleReadIndex(final MqttRaftServer server, final String group, final RpcContext rpcCtx, ReadRequest request) {
try {
final Node node = server.getNode(group);
if (Objects.isNull(node)) {
rpcCtx.sendResponse(Response.newBuilder().setSuccess(false)
.setErrMsg("Could not find the corresponding Raft Group : " + group).build());
return;
}
final StateProcessor processor = server.getProcessor(request.getCategory());
if (Objects.isNull(processor)) {
rpcCtx.sendResponse(Response.newBuilder().setSuccess(false)
.setErrMsg("Could not find the StateProcessor: " + group).build());
return;
}
try {
node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if (status.isOk()) {
try {
Response response = processor.onReadRequest(request);
rpcCtx.sendResponse(response);
} catch (Throwable t) {
LOGGER.info("process read request in handleReadIndex error : {}", t.toString());
rpcCtx.sendResponse(Response.newBuilder().setErrMsg(t.toString()).setSuccess(false).build());
}
return;
}
LOGGER.error("ReadIndex has error : {}, go to Leader read.", status.getErrorMsg());
readFromLeader(server, group, rpcCtx, request);
}
});
} catch (Throwable e) {
LOGGER.error("ReadIndex has error : {}, go to Leader read.", e.toString());
// run raft read
readFromLeader(server, group, rpcCtx, request);
}
} catch (Throwable e) {
LOGGER.error("handleReadIndex has error : ", e);
rpcCtx.sendResponse(Response.newBuilder().setSuccess(false).setErrMsg(e.toString()).build());
}
}