in store/src/main/java/org/apache/rocketmq/store/CommitLog.java [923:1098]
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
msg.setStoreTimestamp(System.currentTimeMillis());
}
// Set the message body CRC (consider the most appropriate setting on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
if (enabledAppendPropCRC) {
// delete crc32 properties if exist
msg.deleteProperty(MessageConst.PROPERTY_CRC32);
}
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
msg.setVersion(MessageVersion.MESSAGE_VERSION_V1);
boolean autoMessageVersionOnTopicLen =
this.defaultMessageStore.getMessageStoreConfig().isAutoMessageVersionOnTopicLen();
if (autoMessageVersionOnTopicLen && topic.length() > Byte.MAX_VALUE) {
msg.setVersion(MessageVersion.MESSAGE_VERSION_V2);
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
updateMaxMessageSize(putMessageThreadLocal);
String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg);
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
long currOffset;
if (mappedFile == null) {
currOffset = 0;
} else {
currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
}
int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
boolean needHandleHA = needHandleHA(msg);
if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
}
if (this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
// -1 means all ack in SyncStateSet
needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
}
} else if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) {
int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset));
needAckNums = calcNeedAckNums(inSyncReplicas);
if (needAckNums > inSyncReplicas) {
// Tell the producer, don't have enough slaves to handle the send request
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
}
}
topicQueueLock.lock(topicQueueKey);
try {
boolean needAssignOffset = true;
if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()
&& defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
needAssignOffset = false;
}
if (needAssignOffset) {
defaultMessageStore.assignOffset(msg);
}
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
return CompletableFuture.completedFuture(encodeResult);
}
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey);
putMessageLock.lock(); //spin or ReentrantLock, depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
msg.setStoreTimestamp(beginLockTimestamp);
}
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
if (isCloseReadAhead()) {
setFileReadMode(mappedFile, LibC.MADV_RANDOM);
}
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {
case PUT_OK:
onCommitLogAppend(msg, result, mappedFile);
break;
case END_OF_FILE:
onCommitLogAppend(msg, result, mappedFile);
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
}
if (isCloseReadAhead()) {
setFileReadMode(mappedFile, LibC.MADV_RANDOM);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
onCommitLogAppend(msg, result, mappedFile);
}
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
// Increase queue offset when messages are successfully written
if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
this.defaultMessageStore.increaseOffset(msg, getMessageNum(msg));
}
} catch (RocksDBException e) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
} finally {
topicQueueLock.unlock(topicQueueKey);
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);
}