public Response onWriteRequest()

in mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/processor/WillMsgStateProcessor.java [69:96]


    public Response onWriteRequest(WriteRequest log) throws Exception {
        try {
            MqttStateMachine sm = server.getMqttStateMachine(log.getGroup());
            if (sm == null) {
                logger.error("Fail to process will WriteRequest , Not Found SM for {}", log.getGroup());
                return null;
            }
            String operation = log.getOperation();
            String key = log.getKey();
            byte[] value = log.getData().toByteArray();

            if ("put".equals(operation)) {
                return put(sm.getRocksDBEngine(), key.getBytes(), value);
            } else if ("delete".equals(operation)) {
                return delete(sm.getRocksDBEngine(), key.getBytes());
            } else if ("compareAndPut".equals(operation)) {
                String expectValue = log.getExtDataMap().get("expectValue");
                if (Constants.NOT_FOUND.equals(expectValue)) {
                    return compareAndPut(sm.getRocksDBEngine(), key.getBytes(), null, value);
                }
                return compareAndPut(sm.getRocksDBEngine(), key.getBytes(), log.getExtDataMap().get("expectValue").getBytes(), value);
            }
        } catch (Exception e) {
            logger.error("Fail to process will WriteRequest, k {}", log.getKey(), e);
            throw e;
        }
        return null;
    }