public CompletableFuture asyncPutMessages()

in store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java [657:798]


    public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());

        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
        }
        if (messageExtBatch.getDelayTimeLevel() > 0) {
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
        }

        // Set the storage time
        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
            messageExtBatch.setBornHostV6Flag();
        }

        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
            messageExtBatch.setStoreHostAddressV6Flag();
        }

        messageExtBatch.setVersion(MessageVersion.MESSAGE_VERSION_V1);
        boolean autoMessageVersionOnTopicLen =
            this.defaultMessageStore.getMessageStoreConfig().isAutoMessageVersionOnTopicLen();
        if (autoMessageVersionOnTopicLen && messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
            messageExtBatch.setVersion(MessageVersion.MESSAGE_VERSION_V2);
        }

        // Back to Results
        AppendMessageResult appendResult;
        BatchAppendFuture<AppendEntryResponse> dledgerFuture;
        EncodeResult encodeResult;

        encodeResult = this.messageSerializer.serialize(messageExtBatch);
        if (encodeResult.status != AppendMessageStatus.PUT_OK) {
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
                .status)));
        }

        int batchNum = encodeResult.batchData.size();
        topicQueueLock.lock(encodeResult.queueOffsetKey);
        try {
            defaultMessageStore.assignOffset(messageExtBatch);

            putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
            msgIdBuilder.setLength(0);
            long elapsedTimeInLock;
            long queueOffset;
            int msgNum = 0;
            try {
                beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
                queueOffset = getQueueOffsetByKey(messageExtBatch, tranType);
                encodeResult.setQueueOffsetKey(queueOffset, true);
                BatchAppendEntryRequest request = new BatchAppendEntryRequest();
                request.setGroup(dLedgerConfig.getGroup());
                request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
                request.setBatchMsgs(encodeResult.batchData);
                AppendFuture<AppendEntryResponse> appendFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
                if (appendFuture.getPos() == -1) {
                    log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode());
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
                }
                dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) appendFuture;

                long wroteOffset = 0;

                int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
                ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);

                boolean isFirstOffset = true;
                long firstWroteOffset = 0;
                for (long pos : dledgerFuture.getPositions()) {
                    wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
                    if (isFirstOffset) {
                        firstWroteOffset = wroteOffset;
                        isFirstOffset = false;
                    }
                    String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset);
                    if (msgIdBuilder.length() > 0) {
                        msgIdBuilder.append(',').append(msgId);
                    } else {
                        msgIdBuilder.append(msgId);
                    }
                    msgNum++;
                }

                elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
                appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen,
                    msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
                appendResult.setMsgNum(msgNum);
            } finally {
                beginTimeInDledgerLock = 0;
                putMessageLock.unlock();
            }

            if (elapsedTimeInLock > 500) {
                log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}",
                    elapsedTimeInLock, messageExtBatch.getBody().length, appendResult);
            }

            defaultMessageStore.increaseOffset(messageExtBatch, (short) batchNum);

        } catch (Exception e) {
            log.error("Put message error", e);
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
        }  finally {
            topicQueueLock.unlock(encodeResult.queueOffsetKey);
        }

        return dledgerFuture.thenApply(appendEntryResponse -> {
            PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
            switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
                case SUCCESS:
                    putMessageStatus = PutMessageStatus.PUT_OK;
                    break;
                case INCONSISTENT_LEADER:
                case NOT_LEADER:
                case LEADER_NOT_READY:
                case DISK_FULL:
                    putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
                    break;
                case WAIT_QUORUM_ACK_TIMEOUT:
                    //Do not return flush_slave_timeout to the client, for the client will ignore it.
                    putMessageStatus = PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH;
                    break;
                case LEADER_PENDING_FULL:
                    putMessageStatus = PutMessageStatus.OS_PAGE_CACHE_BUSY;
                    break;
            }
            PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
            if (putMessageStatus == PutMessageStatus.PUT_OK) {
                // Statistics
                storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(appendResult.getMsgNum());
                storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(appendResult.getWroteBytes());
            }
            return putMessageResult;
        });
    }