in rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java [165:212]
private static Message getAndWrapMessage(String destination, MessageHeaders headers, byte[] payloads) {
if (destination == null || destination.length() < 1) {
return null;
}
if (payloads == null || payloads.length < 1) {
return null;
}
String[] tempArr = destination.split(":", 2);
String topic = tempArr[0];
String tags = "";
if (tempArr.length > 1) {
tags = tempArr[1];
}
Message rocketMsg = new Message(topic, tags, payloads);
if (Objects.nonNull(headers) && !headers.isEmpty()) {
Object keys = headers.get(RocketMQHeaders.KEYS);
// if headers not have 'KEYS', try add prefix when getting keys
if (ObjectUtils.isEmpty(keys)) {
keys = headers.get(toRocketHeaderKey(RocketMQHeaders.KEYS));
}
if (!ObjectUtils.isEmpty(keys)) { // if headers has 'KEYS', set rocketMQ message key
rocketMsg.setKeys(keys.toString());
}
Object flagObj = headers.getOrDefault("FLAG", "0");
int flag = 0;
try {
flag = Integer.parseInt(flagObj.toString());
} catch (NumberFormatException e) {
// Ignore it
if (log.isInfoEnabled()) {
log.info("flag must be integer, flagObj:{}", flagObj);
}
}
rocketMsg.setFlag(flag);
Object waitStoreMsgOkObj = headers.getOrDefault("WAIT_STORE_MSG_OK", "true");
rocketMsg.setWaitStoreMsgOK(!waitStoreMsgOkObj.equals("false"));
headers.entrySet().stream()
.filter(entry -> !Objects.equals(entry.getKey(), "FLAG")
&& !Objects.equals(entry.getKey(), "WAIT_STORE_MSG_OK")) // exclude "FLAG", "WAIT_STORE_MSG_OK"
.forEach(entry -> {
if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
rocketMsg.putUserProperty(entry.getKey(), String.valueOf(entry.getValue()));
}
});
}
return rocketMsg;
}