in store/src/main/java/org/apache/rocketmq/store/CommitLog.java [1100:1259]
public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
if (messageExtBatch.getDelayTimeLevel() > 0) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
messageExtBatch.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
messageExtBatch.setStoreHostAddressV6Flag();
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
long currOffset;
if (mappedFile == null) {
currOffset = 0;
} else {
currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
}
int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
boolean needHandleHA = needHandleHA(messageExtBatch);
if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
}
if (this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
// -1 means all ack in SyncStateSet
needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
}
} else if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) {
int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset));
needAckNums = calcNeedAckNums(inSyncReplicas);
if (needAckNums > inSyncReplicas) {
// Tell the producer, don't have enough slaves to handle the send request
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
}
}
messageExtBatch.setVersion(MessageVersion.MESSAGE_VERSION_V1);
boolean autoMessageVersionOnTopicLen =
this.defaultMessageStore.getMessageStoreConfig().isAutoMessageVersionOnTopicLen();
if (autoMessageVersionOnTopicLen && messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
messageExtBatch.setVersion(MessageVersion.MESSAGE_VERSION_V2);
}
//fine-grained lock instead of the coarse-grained
PutMessageThreadLocal pmThreadLocal = this.putMessageThreadLocal.get();
updateMaxMessageSize(pmThreadLocal);
MessageExtEncoder batchEncoder = pmThreadLocal.getEncoder();
String topicQueueKey = generateKey(pmThreadLocal.getKeyBuilder(), messageExtBatch);
PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey);
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
topicQueueLock.lock(topicQueueKey);
try {
defaultMessageStore.assignOffset(messageExtBatch);
putMessageLock.lock();
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
messageExtBatch.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
if (isCloseReadAhead()) {
setFileReadMode(mappedFile, LibC.MADV_RANDOM);
}
}
if (null == mappedFile) {
log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
}
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
}
if (isCloseReadAhead()) {
setFileReadMode(mappedFile, LibC.MADV_RANDOM);
}
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
// Increase queue offset when messages are successfully written
if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
this.defaultMessageStore.increaseOffset(messageExtBatch, (short) putMessageContext.getBatchSize());
}
} catch (RocksDBException e) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
} finally {
topicQueueLock.unlock(topicQueueKey);
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());
return handleDiskFlushAndHA(putMessageResult, messageExtBatch, needAckNums, needHandleHA);
}