in tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java [125:329]
public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile, boolean force) {
if (stopped) {
return CompletableFuture.completedFuture(true);
}
String topic = flatFile.getMessageQueue().getTopic();
int queueId = flatFile.getMessageQueue().getQueueId();
// For test scenarios, we set the 'force' variable to true to
// ensure that the data in the cache is directly committed successfully.
force = !storeConfig.isTieredStoreGroupCommit() || force;
if (force) {
flatFile.getFileLock().lock();
} else {
if (!flatFile.getFileLock().tryLock()) {
return CompletableFuture.completedFuture(false);
}
}
try {
if (topicFilter != null && topicFilter.filterTopic(flatFile.getMessageQueue().getTopic())) {
flatFileStore.destroyFile(flatFile.getMessageQueue());
return CompletableFuture.completedFuture(false);
}
long currentOffset = flatFile.getConsumeQueueMaxOffset();
long commitOffset = flatFile.getConsumeQueueCommitOffset();
long minOffsetInQueue = defaultStore.getMinOffsetInQueue(topic, queueId);
long maxOffsetInQueue = defaultStore.getMaxOffsetInQueue(topic, queueId);
// If set to max offset here, some written messages may be lost
if (!flatFile.isFlatFileInit()) {
currentOffset = defaultStore.getOffsetInQueueByTime(
topic, queueId, System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(2));
currentOffset = Math.max(currentOffset, minOffsetInQueue);
currentOffset = Math.min(currentOffset, maxOffsetInQueue);
flatFile.initOffset(currentOffset);
log.warn("MessageDispatcher#dispatch init, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
return CompletableFuture.completedFuture(true);
}
// If the previous commit fails, attempt to trigger a commit directly.
if (commitOffset < currentOffset) {
this.commitAsync(flatFile).whenComplete((result, throwable) -> {
if (throwable != null) {
log.error("MessageDispatcher#flatFile commitOffset less than currentOffset, commitAsync again failed. topic: {}, queueId: {} ", topic, queueId, throwable);
}
});
return CompletableFuture.completedFuture(false);
}
if (failedGroupCommitMap.containsKey(flatFile)) {
GroupCommitContext failedCommit = failedGroupCommitMap.get(flatFile);
if (failedCommit.getEndOffset() <= commitOffset) {
failedGroupCommitMap.remove(flatFile);
constructIndexFile(flatFile.getTopicId(), failedCommit);
}
}
if (currentOffset < minOffsetInQueue) {
log.warn("MessageDispatcher#dispatch, current offset is too small, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
flatFileStore.destroyFile(flatFile.getMessageQueue());
flatFileStore.computeIfAbsent(new MessageQueue(topic, brokerName, queueId));
return CompletableFuture.completedFuture(true);
}
if (currentOffset > maxOffsetInQueue) {
log.warn("MessageDispatcher#dispatch, current offset is too large, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
return CompletableFuture.completedFuture(false);
}
long interval = TimeUnit.HOURS.toMillis(storeConfig.getCommitLogRollingInterval());
if (flatFile.rollingFile(interval)) {
log.info("MessageDispatcher#dispatch, rolling file, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
}
if (currentOffset == maxOffsetInQueue) {
return CompletableFuture.completedFuture(false);
}
long bufferSize = 0L;
long groupCommitSize = storeConfig.getTieredStoreGroupCommitSize();
long groupCommitCount = storeConfig.getTieredStoreGroupCommitCount();
long targetOffset = Math.min(currentOffset + groupCommitCount, maxOffsetInQueue);
ConsumeQueueInterface consumeQueue = defaultStore.getConsumeQueue(topic, queueId);
CqUnit cqUnit = consumeQueue.get(currentOffset);
if (cqUnit == null) {
log.warn("MessageDispatcher#dispatch cq not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
return CompletableFuture.completedFuture(false);
}
SelectMappedBufferResult message =
defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize());
if (message == null) {
log.warn("MessageDispatcher#dispatch message not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
return CompletableFuture.completedFuture(false);
}
boolean timeout = MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
storeConfig.getTieredStoreGroupCommitTimeout() < System.currentTimeMillis();
boolean bufferFull = maxOffsetInQueue - currentOffset > storeConfig.getTieredStoreGroupCommitCount();
if (!timeout && !bufferFull && !force) {
log.debug("MessageDispatcher#dispatch hold, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
message.release();
return CompletableFuture.completedFuture(false);
} else {
if (MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
TimeUnit.MINUTES.toMillis(5) < System.currentTimeMillis()) {
log.warn("MessageDispatcher#dispatch behind too much, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
} else {
log.info("MessageDispatcher#dispatch success, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
}
message.release();
}
long offset = currentOffset;
List<SelectMappedBufferResult> appendingBufferList = new ArrayList<>();
List<DispatchRequest> dispatchRequestList = new ArrayList<>();
for (; offset < targetOffset; offset++) {
cqUnit = consumeQueue.get(offset);
bufferSize += cqUnit.getSize();
if (bufferSize >= groupCommitSize) {
break;
}
message = defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize());
appendingBufferList.add(message);
ByteBuffer byteBuffer = message.getByteBuffer();
AppendResult result = flatFile.appendCommitLog(message);
if (!AppendResult.SUCCESS.equals(result)) {
break;
}
long mappedCommitLogOffset = flatFile.getCommitLogMaxOffset() - byteBuffer.remaining();
Map<String, String> properties = MessageFormatUtil.getProperties(byteBuffer);
DispatchRequest dispatchRequest = new DispatchRequest(topic, queueId, mappedCommitLogOffset,
cqUnit.getSize(), cqUnit.getTagsCode(), MessageFormatUtil.getStoreTimeStamp(byteBuffer),
cqUnit.getQueueOffset(), properties.getOrDefault(MessageConst.PROPERTY_KEYS, ""),
properties.getOrDefault(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, ""),
0, 0, new HashMap<>());
dispatchRequest.setOffsetId(MessageFormatUtil.getOffsetId(byteBuffer));
result = flatFile.appendConsumeQueue(dispatchRequest);
if (!AppendResult.SUCCESS.equals(result)) {
break;
} else {
dispatchRequestList.add(dispatchRequest);
}
}
GroupCommitContext groupCommitContext = new GroupCommitContext();
groupCommitContext.setEndOffset(offset);
groupCommitContext.setBufferList(appendingBufferList);
groupCommitContext.setDispatchRequests(dispatchRequestList);
// If there are many messages waiting to be uploaded, call the upload logic immediately.
boolean repeat = timeout || maxOffsetInQueue - offset > storeConfig.getTieredStoreGroupCommitCount();
if (!dispatchRequestList.isEmpty()) {
Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
.put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, queueId)
.put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, FileSegmentType.COMMIT_LOG.name().toLowerCase())
.build();
TieredStoreMetricsManager.messagesDispatchTotal.add(offset - currentOffset, attributes);
this.commitAsync(flatFile).whenComplete((success, throwable) -> {
if (success) {
constructIndexFile(flatFile.getTopicId(), groupCommitContext);
}
else {
//next commit async,execute constructIndexFile.
GroupCommitContext oldCommit = failedGroupCommitMap.put(flatFile, groupCommitContext);
if (oldCommit != null) {
log.warn("MessageDispatcher#commitAsync failed,flatFile old failed commit context not release, topic={}, queueId={} ", topic, queueId);
oldCommit.release();
}
}
if (success && repeat) {
storeExecutor.commonExecutor.submit(() -> dispatchWithSemaphore(flatFile));
}
}
);
}
} catch (ConsumeQueueException e) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
} finally {
flatFile.getFileLock().unlock();
}
return CompletableFuture.completedFuture(false);
}