in java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java [227:303]
public static MessageViewImpl fromProtobuf(Message message, MessageQueueImpl mq,
Long transportDeliveryTimestamp) {
final SystemProperties systemProperties = message.getSystemProperties();
final String topic = message.getTopic().getName();
final MessageId messageId = MessageIdCodec.getInstance().decode(systemProperties.getMessageId());
final Digest bodyDigest = systemProperties.getBodyDigest();
byte[] body = message.getBody().toByteArray();
boolean corrupted = false;
final String checksum = bodyDigest.getChecksum();
String expectedChecksum;
final DigestType digestType = bodyDigest.getType();
switch (digestType) {
case CRC32:
expectedChecksum = Utilities.crc32CheckSum(body);
if (!expectedChecksum.equals(checksum)) {
corrupted = true;
}
break;
case MD5:
try {
expectedChecksum = Utilities.md5CheckSum(body);
if (!expectedChecksum.equals(checksum)) {
corrupted = true;
}
} catch (NoSuchAlgorithmException e) {
corrupted = true;
log.error("MD5 is not supported unexpectedly, skip it, topic={}, messageId={}", topic,
messageId);
}
break;
case SHA1:
try {
expectedChecksum = Utilities.sha1CheckSum(body);
if (!expectedChecksum.equals(checksum)) {
corrupted = true;
}
} catch (NoSuchAlgorithmException e) {
corrupted = true;
log.error("SHA-1 is not supported unexpectedly, skip it, topic={}, messageId={}", topic,
messageId);
}
break;
default:
log.error("Unsupported message body digest algorithm, digestType={}, topic={}, messageId={}",
digestType, topic, messageId);
}
final Encoding bodyEncoding = systemProperties.getBodyEncoding();
switch (bodyEncoding) {
case GZIP:
try {
body = Utilities.decompressBytes(body);
} catch (IOException e) {
log.error("Failed to uncompress message body, topic={}, messageId={}", topic, messageId);
corrupted = true;
}
break;
case IDENTITY:
break;
default:
log.error("Unsupported message encoding algorithm, topic={}, messageId={}, bodyEncoding={}", topic,
messageId, bodyEncoding);
}
String tag = systemProperties.hasTag() ? systemProperties.getTag() : null;
String messageGroup = systemProperties.hasMessageGroup() ? systemProperties.getMessageGroup() : null;
Long deliveryTimestamp = systemProperties.hasDeliveryTimestamp() ?
Timestamps.toMillis(systemProperties.getDeliveryTimestamp()) : null;
final ProtocolStringList keys = systemProperties.getKeysList();
final String bornHost = systemProperties.getBornHost();
final long bornTimestamp = Timestamps.toMillis(systemProperties.getBornTimestamp());
final int deliveryAttempt = systemProperties.getDeliveryAttempt();
final long offset = systemProperties.getQueueOffset();
final Map<String, String> properties = message.getUserPropertiesMap();
final String receiptHandle = systemProperties.getReceiptHandle();
return new MessageViewImpl(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties,
bornHost, bornTimestamp, deliveryAttempt, mq, receiptHandle, offset, corrupted, transportDeliveryTimestamp);
}