public CompletableFuture doScheduleDispatch()

in tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java [125:329]


    public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile, boolean force) {
        if (stopped) {
            return CompletableFuture.completedFuture(true);
        }

        String topic = flatFile.getMessageQueue().getTopic();
        int queueId = flatFile.getMessageQueue().getQueueId();

        // For test scenarios, we set the 'force' variable to true to
        // ensure that the data in the cache is directly committed successfully.
        force = !storeConfig.isTieredStoreGroupCommit() || force;
        if (force) {
            flatFile.getFileLock().lock();
        } else {
            if (!flatFile.getFileLock().tryLock()) {
                return CompletableFuture.completedFuture(false);
            }
        }

        try {
            if (topicFilter != null && topicFilter.filterTopic(flatFile.getMessageQueue().getTopic())) {
                flatFileStore.destroyFile(flatFile.getMessageQueue());
                return CompletableFuture.completedFuture(false);
            }

            long currentOffset = flatFile.getConsumeQueueMaxOffset();
            long commitOffset = flatFile.getConsumeQueueCommitOffset();
            long minOffsetInQueue = defaultStore.getMinOffsetInQueue(topic, queueId);
            long maxOffsetInQueue = defaultStore.getMaxOffsetInQueue(topic, queueId);

            // If set to max offset here, some written messages may be lost
            if (!flatFile.isFlatFileInit()) {
                currentOffset = defaultStore.getOffsetInQueueByTime(
                    topic, queueId, System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(2));
                currentOffset = Math.max(currentOffset, minOffsetInQueue);
                currentOffset = Math.min(currentOffset, maxOffsetInQueue);
                flatFile.initOffset(currentOffset);
                log.warn("MessageDispatcher#dispatch init, topic={}, queueId={}, offset={}-{}, current={}",
                    topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
                return CompletableFuture.completedFuture(true);
            }

            // If the previous commit fails, attempt to trigger a commit directly.
            if (commitOffset < currentOffset) {
                this.commitAsync(flatFile).whenComplete((result, throwable) -> {
                    if (throwable != null) {
                        log.error("MessageDispatcher#flatFile commitOffset less than currentOffset, commitAsync again failed. topic: {}, queueId: {} ", topic, queueId, throwable);
                    }
                });
                return CompletableFuture.completedFuture(false);
            }

            if (failedGroupCommitMap.containsKey(flatFile)) {
                GroupCommitContext failedCommit = failedGroupCommitMap.get(flatFile);
                if (failedCommit.getEndOffset() <= commitOffset) {
                    failedGroupCommitMap.remove(flatFile);
                    constructIndexFile(flatFile.getTopicId(), failedCommit);
                }
            }

            if (currentOffset < minOffsetInQueue) {
                log.warn("MessageDispatcher#dispatch, current offset is too small, topic={}, queueId={}, offset={}-{}, current={}",
                    topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
                flatFileStore.destroyFile(flatFile.getMessageQueue());
                flatFileStore.computeIfAbsent(new MessageQueue(topic, brokerName, queueId));
                return CompletableFuture.completedFuture(true);
            }

            if (currentOffset > maxOffsetInQueue) {
                log.warn("MessageDispatcher#dispatch, current offset is too large, topic={}, queueId={}, offset={}-{}, current={}",
                    topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
                return CompletableFuture.completedFuture(false);
            }

            long interval = TimeUnit.HOURS.toMillis(storeConfig.getCommitLogRollingInterval());
            if (flatFile.rollingFile(interval)) {
                log.info("MessageDispatcher#dispatch, rolling file, topic={}, queueId={}, offset={}-{}, current={}",
                    topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
            }

            if (currentOffset == maxOffsetInQueue) {
                return CompletableFuture.completedFuture(false);
            }

            long bufferSize = 0L;
            long groupCommitSize = storeConfig.getTieredStoreGroupCommitSize();
            long groupCommitCount = storeConfig.getTieredStoreGroupCommitCount();
            long targetOffset = Math.min(currentOffset + groupCommitCount, maxOffsetInQueue);

            ConsumeQueueInterface consumeQueue = defaultStore.getConsumeQueue(topic, queueId);
            CqUnit cqUnit = consumeQueue.get(currentOffset);
            if (cqUnit == null) {
                log.warn("MessageDispatcher#dispatch cq not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
                    topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
                return CompletableFuture.completedFuture(false);
            }

            SelectMappedBufferResult message =
                defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize());
            if (message == null) {
                log.warn("MessageDispatcher#dispatch message not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
                    topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
                return CompletableFuture.completedFuture(false);
            }

            boolean timeout = MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
                storeConfig.getTieredStoreGroupCommitTimeout() < System.currentTimeMillis();
            boolean bufferFull = maxOffsetInQueue - currentOffset > storeConfig.getTieredStoreGroupCommitCount();

            if (!timeout && !bufferFull && !force) {
                log.debug("MessageDispatcher#dispatch hold, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
                    topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
                message.release();
                return CompletableFuture.completedFuture(false);
            } else {
                if (MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
                    TimeUnit.MINUTES.toMillis(5) < System.currentTimeMillis()) {
                    log.warn("MessageDispatcher#dispatch behind too much, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
                        topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
                } else {
                    log.info("MessageDispatcher#dispatch success, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
                        topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
                }
                message.release();
            }

            long offset = currentOffset;
            List<SelectMappedBufferResult> appendingBufferList = new ArrayList<>();
            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
            for (; offset < targetOffset; offset++) {
                cqUnit = consumeQueue.get(offset);
                bufferSize += cqUnit.getSize();
                if (bufferSize >= groupCommitSize) {
                    break;
                }
                message = defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize());
                appendingBufferList.add(message);

                ByteBuffer byteBuffer = message.getByteBuffer();
                AppendResult result = flatFile.appendCommitLog(message);
                if (!AppendResult.SUCCESS.equals(result)) {
                    break;
                }

                long mappedCommitLogOffset = flatFile.getCommitLogMaxOffset() - byteBuffer.remaining();
                Map<String, String> properties = MessageFormatUtil.getProperties(byteBuffer);

                DispatchRequest dispatchRequest = new DispatchRequest(topic, queueId, mappedCommitLogOffset,
                    cqUnit.getSize(), cqUnit.getTagsCode(), MessageFormatUtil.getStoreTimeStamp(byteBuffer),
                    cqUnit.getQueueOffset(), properties.getOrDefault(MessageConst.PROPERTY_KEYS, ""),
                    properties.getOrDefault(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, ""),
                    0, 0, new HashMap<>());
                dispatchRequest.setOffsetId(MessageFormatUtil.getOffsetId(byteBuffer));

                result = flatFile.appendConsumeQueue(dispatchRequest);
                if (!AppendResult.SUCCESS.equals(result)) {
                    break;
                } else {
                    dispatchRequestList.add(dispatchRequest);
                }
            }

            GroupCommitContext groupCommitContext = new GroupCommitContext();
            groupCommitContext.setEndOffset(offset);
            groupCommitContext.setBufferList(appendingBufferList);
            groupCommitContext.setDispatchRequests(dispatchRequestList);

            // If there are many messages waiting to be uploaded, call the upload logic immediately.
            boolean repeat = timeout || maxOffsetInQueue - offset > storeConfig.getTieredStoreGroupCommitCount();

            if (!dispatchRequestList.isEmpty()) {
                Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder()
                    .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
                    .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, queueId)
                    .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, FileSegmentType.COMMIT_LOG.name().toLowerCase())
                    .build();
                TieredStoreMetricsManager.messagesDispatchTotal.add(offset - currentOffset, attributes);

                this.commitAsync(flatFile).whenComplete((success, throwable) -> {
                        if (success) {
                            constructIndexFile(flatFile.getTopicId(), groupCommitContext);
                        }
                        else {
                            //next commit async,execute constructIndexFile.
                            GroupCommitContext oldCommit = failedGroupCommitMap.put(flatFile, groupCommitContext);
                            if (oldCommit != null) {
                                log.warn("MessageDispatcher#commitAsync failed,flatFile old failed commit context not release, topic={}, queueId={}  ", topic, queueId);
                                oldCommit.release();
                            }
                        }
                        if (success && repeat) {
                            storeExecutor.commonExecutor.submit(() -> dispatchWithSemaphore(flatFile));
                        }
                    }
                );
            }
        } catch (ConsumeQueueException e) {
            CompletableFuture<Boolean> future = new CompletableFuture<>();
            future.completeExceptionally(e);
            return future;
        } finally {
            flatFile.getFileLock().unlock();
        }
        return CompletableFuture.completedFuture(false);
    }