in java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java [40:74]
public PublishingMessageImpl(Message message, PublishingSettings publishingSettings, boolean txEnabled)
throws IOException {
super(message);
final int length = message.getBody().remaining();
final int maxBodySizeBytes = publishingSettings.getMaxBodySizeBytes();
if (length > maxBodySizeBytes) {
throw new IOException("Message body size exceeds the threshold, max size=" + maxBodySizeBytes + " bytes");
}
// Generate message id.
this.messageId = MessageIdCodec.getInstance().nextMessageId();
// Normal message.
if (!message.getMessageGroup().isPresent() &&
!message.getDeliveryTimestamp().isPresent() && !txEnabled) {
messageType = MessageType.NORMAL;
return;
}
// Fifo message.
if (message.getMessageGroup().isPresent() && !txEnabled) {
messageType = MessageType.FIFO;
return;
}
// Delay message.
if (message.getDeliveryTimestamp().isPresent() && !txEnabled) {
messageType = MessageType.DELAY;
return;
}
// Transaction message.
if (!message.getMessageGroup().isPresent() &&
!message.getDeliveryTimestamp().isPresent() && txEnabled) {
messageType = MessageType.TRANSACTION;
return;
}
// Transaction semantics is conflicted with fifo/delay.
throw new IllegalArgumentException("Transactional message should not set messageGroup or deliveryTimestamp");
}