in mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttStateMachine.java [52:105]
public void onApply(Iterator iterator) {
int index = 0;
int applied = 0;
Message message;
MqttClosure closure = null;
try {
while (iterator.hasNext()) {
Status status = Status.OK();
try {
if (iterator.done() != null) {
closure = (MqttClosure) iterator.done();
message = closure.getMessage();
} else {
final ByteBuffer data = iterator.getData();
message = parseMessage(data.array());
}
LOGGER.debug("get message:{} and apply to state machine", message);
if (message instanceof WriteRequest) {
WriteRequest writeRequest = (WriteRequest) message;
StateProcessor processor = server.getProcessor(writeRequest.getCategory());
Response response = processor.onWriteRequest((WriteRequest) message);
if (Objects.nonNull(closure)) {
closure.setResponse(response);
}
}
if (message instanceof ReadRequest) {
ReadRequest request = (ReadRequest) message;
StateProcessor processor = server.getProcessor(request.getCategory());
Response response = processor.onReadRequest((ReadRequest) message);
if (Objects.nonNull(closure)) {
closure.setResponse(response);
}
}
} catch (Throwable e) {
index++;
status.setError(RaftError.UNKNOWN, e.toString());
Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e));
throw e;
} finally {
Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status));
}
applied++;
index++;
iterator.next();
}
} catch (Throwable t) {
LOGGER.error("stateMachine meet critical error", t);
//iterator.setErrorAndRollback(index - applied, new Status(RaftError.ESTATEMACHINE, "StateMachine meet critical error: %s.", t.toString()));
}
}