in mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java [69:96]
public Response onWriteRequest(WriteRequest log) throws Exception {
try {
MqttStateMachine sm = server.getMqttStateMachine(log.getGroup());
if (sm == null) {
logger.error("Fail to process will WriteRequest , Not Found SM for {}", log.getGroup());
return null;
}
String operation = log.getOperation();
String key = log.getKey();
byte[] value = log.getData().toByteArray();
if ("put".equals(operation)) {
return put(sm.getRocksDBEngine(), key.getBytes(), value);
} else if ("delete".equals(operation)) {
return delete(sm.getRocksDBEngine(), key.getBytes());
} else if ("compareAndPut".equals(operation)) {
String expectValue = log.getExtDataMap().get("expectValue");
if (Constants.NOT_FOUND.equals(expectValue)) {
return compareAndPut(sm.getRocksDBEngine(), key.getBytes(), null, value);
}
return compareAndPut(sm.getRocksDBEngine(), key.getBytes(), log.getExtDataMap().get("expectValue").getBytes(), value);
}
} catch (Exception e) {
logger.error("Fail to process will WriteRequest, k {}", log.getKey(), e);
throw e;
}
return null;
}