in store/src/main/java/org/apache/rocketmq/store/CommitLog.java [435:644]
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
final boolean checkDupInfo, final boolean readBody) {
try {
if (byteBuffer.remaining() <= 4) {
return new DispatchRequest(-1, false /* fail */);
}
// 1 TOTAL SIZE
int totalSize = byteBuffer.getInt();
if (byteBuffer.remaining() < totalSize - 4) {
return new DispatchRequest(-1, false /* fail */);
}
// 2 MAGIC CODE
int magicCode = byteBuffer.getInt();
switch (magicCode) {
case MessageDecoder.MESSAGE_MAGIC_CODE:
case MessageDecoder.MESSAGE_MAGIC_CODE_V2:
break;
case BLANK_MAGIC_CODE:
return new DispatchRequest(0, true /* success */);
default:
log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
return new DispatchRequest(-1, false /* success */);
}
MessageVersion messageVersion = MessageVersion.valueOfMagicCode(magicCode);
byte[] bytesContent = new byte[totalSize];
int bodyCRC = byteBuffer.getInt();
int queueId = byteBuffer.getInt();
int flag = byteBuffer.getInt();
long queueOffset = byteBuffer.getLong();
long physicOffset = byteBuffer.getLong();
int sysFlag = byteBuffer.getInt();
long bornTimeStamp = byteBuffer.getLong();
ByteBuffer byteBuffer1;
if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4);
} else {
byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4);
}
long storeTimestamp = byteBuffer.getLong();
ByteBuffer byteBuffer2;
if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4);
} else {
byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4);
}
int reconsumeTimes = byteBuffer.getInt();
long preparedTransactionOffset = byteBuffer.getLong();
int bodyLen = byteBuffer.getInt();
if (bodyLen > 0) {
if (readBody) {
byteBuffer.get(bytesContent, 0, bodyLen);
if (checkCRC) {
/**
* When the forceVerifyPropCRC = false,
* use original bodyCrc validation.
*/
if (!this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) {
int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
if (crc != bodyCRC) {
log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
return new DispatchRequest(-1, false/* success */);
}
}
}
} else {
byteBuffer.position(byteBuffer.position() + bodyLen);
}
}
int topicLen = messageVersion.getTopicLength(byteBuffer);
byteBuffer.get(bytesContent, 0, topicLen);
String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);
long tagsCode = 0;
String keys = "";
String uniqKey = null;
short propertiesLength = byteBuffer.getShort();
Map<String, String> propertiesMap = null;
if (propertiesLength > 0) {
byteBuffer.get(bytesContent, 0, propertiesLength);
String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
propertiesMap = MessageDecoder.string2messageProperties(properties);
keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (checkDupInfo) {
String dupInfo = propertiesMap.get(MessageConst.DUP_INFO);
if (null == dupInfo || dupInfo.split("_").length != 2) {
log.warn("DupInfo in properties check failed. dupInfo={}", dupInfo);
return new DispatchRequest(-1, false);
}
}
String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
if (!Strings.isNullOrEmpty(tags)) {
tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
}
// Timing message processing
{
String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
int delayLevel = Integer.parseInt(t);
if (delayLevel > this.defaultMessageStore.getMaxDelayLevel()) {
delayLevel = this.defaultMessageStore.getMaxDelayLevel();
}
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.computeDeliverTimestamp(delayLevel,
storeTimestamp);
}
}
}
}
if (checkCRC) {
/**
* When the forceVerifyPropCRC = true,
* Crc verification needs to be performed on the entire message data (excluding the length reserved at the tail)
*/
if (this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) {
int expectedCRC = -1;
if (propertiesMap != null) {
String crc32Str = propertiesMap.get(MessageConst.PROPERTY_CRC32);
if (crc32Str != null) {
expectedCRC = 0;
for (int i = crc32Str.length() - 1; i >= 0; i--) {
int num = crc32Str.charAt(i) - '0';
expectedCRC *= 10;
expectedCRC += num;
}
}
}
if (expectedCRC >= 0) {
ByteBuffer tmpBuffer = byteBuffer.duplicate();
tmpBuffer.position(tmpBuffer.position() - totalSize);
tmpBuffer.limit(tmpBuffer.position() + totalSize - CommitLog.CRC32_RESERVED_LEN);
int crc = UtilAll.crc32(tmpBuffer);
if (crc != expectedCRC) {
log.warn(
"CommitLog#checkAndDispatchMessage: failed to check message CRC, expected "
+ "CRC={}, actual CRC={}", bodyCRC, crc);
return new DispatchRequest(-1, false/* success */);
}
} else {
log.warn(
"CommitLog#checkAndDispatchMessage: failed to check message CRC, not found CRC in properties");
return new DispatchRequest(-1, false/* success */);
}
}
}
int readLength = MessageExtEncoder.calMsgLength(messageVersion, sysFlag, bodyLen, topicLen, propertiesLength);
if (totalSize != readLength) {
doNothingForDeadCode(reconsumeTimes);
doNothingForDeadCode(flag);
doNothingForDeadCode(bornTimeStamp);
doNothingForDeadCode(byteBuffer1);
doNothingForDeadCode(byteBuffer2);
log.error(
"[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
totalSize, readLength, bodyLen, topicLen, propertiesLength);
return new DispatchRequest(totalSize, false/* success */);
}
DispatchRequest dispatchRequest = new DispatchRequest(
topic,
queueId,
physicOffset,
totalSize,
tagsCode,
storeTimestamp,
queueOffset,
keys,
uniqKey,
sysFlag,
preparedTransactionOffset,
propertiesMap
);
setBatchSizeIfNeeded(propertiesMap, dispatchRequest);
return dispatchRequest;
} catch (Exception e) {
log.error("checkMessageAndReturnSize failed, may can not dispatch", e);
}
return new DispatchRequest(-1, false /* success */);
}