void ProducerImpl::sendAsyncWithStatsUpdate()

in lib/ProducerImpl.cc [488:648]


void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& callback) {
    if (!isValidProducerState(callback)) {
        return;
    }

    // Convert the payload before sending the message.
    msg.impl_->convertKeyValueToPayload(conf_.getSchema());
    const auto& uncompressedPayload = msg.impl_->payload;
    const uint32_t uncompressedSize = uncompressedPayload.readableBytes();
    const auto result = canEnqueueRequest(uncompressedSize);
    if (result != ResultOk) {
        // If queue is full sending the batch immediately, no point waiting till batchMessagetimeout
        if (batchMessageContainer_) {
            LOG_DEBUG(getName() << " - sending batch message immediately");
            Lock lock(mutex_);
            auto failures = batchMessageAndSend();
            lock.unlock();
            failures.complete();
        }

        callback(result, {});
        return;
    }

    // We have already reserved a spot, so if we need to early return for failed result, we should release the
    // semaphore and memory first.
    const auto handleFailedResult = [this, uncompressedSize, callback](Result result) {
        releaseSemaphore(uncompressedSize);  // it releases the memory as well
        callback(result, {});
    };

    auto& msgMetadata = msg.impl_->metadata;
    const bool compressed = !canAddToBatch(msg);
    const auto payload =
        compressed ? applyCompression(uncompressedPayload, conf_.getCompressionType()) : uncompressedPayload;
    const auto compressedSize = static_cast<uint32_t>(payload.readableBytes());
    const auto maxMessageSize = static_cast<uint32_t>(ClientConnection::getMaxMessageSize());

    if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name()) {
        handleFailedResult(ResultInvalidMessage);
        return;
    }

    Lock lock(mutex_);
    uint64_t sequenceId;
    if (!msgMetadata.has_sequence_id()) {
        sequenceId = msgSequenceGenerator_++;
    } else {
        sequenceId = msgMetadata.sequence_id();
    }
    setMessageMetadata(msg, sequenceId, uncompressedSize);

    auto payloadChunkSize = maxMessageSize;
    int totalChunks;
    if (!compressed || !chunkingEnabled_) {
        totalChunks = 1;
    } else {
        const auto metadataSize = static_cast<uint32_t>(msgMetadata.ByteSizeLong());
        if (metadataSize >= maxMessageSize) {
            LOG_WARN(getName() << " - metadata size " << metadataSize << " cannot exceed " << maxMessageSize
                               << " bytes");
            handleFailedResult(ResultMessageTooBig);
            return;
        }
        payloadChunkSize = maxMessageSize - metadataSize;
        totalChunks = getNumOfChunks(compressedSize, payloadChunkSize);
    }

    // Each chunk should be sent individually, so try to acquire extra permits for chunks.
    for (int i = 0; i < (totalChunks - 1); i++) {
        const auto result = canEnqueueRequest(0);  // size is 0 because the memory has already reserved
        if (result != ResultOk) {
            handleFailedResult(result);
            return;
        }
    }

    if (canAddToBatch(msg)) {
        // Batching is enabled and the message is not delayed
        if (!batchMessageContainer_->hasEnoughSpace(msg)) {
            batchMessageAndSend().complete();
        }
        bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
        bool isFull = batchMessageContainer_->add(msg, callback);
        if (isFirstMessage) {
            batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
            auto weakSelf = weak_from_this();
            batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
                auto self = weakSelf.lock();
                if (!self) {
                    return;
                }
                if (ec) {
                    LOG_DEBUG(getName() << " Ignoring timer cancelled event, code[" << ec << "]");
                    return;
                }
                LOG_DEBUG(getName() << " - Batch Message Timer expired");

                // ignore if the producer is already closing/closed
                const auto state = state_.load();
                if (state == Pending || state == Ready) {
                    Lock lock(mutex_);
                    auto failures = batchMessageAndSend();
                    lock.unlock();
                    failures.complete();
                }
            });
        }

        if (isFull) {
            auto failures = batchMessageAndSend();
            lock.unlock();
            failures.complete();
        }
    } else {
        const bool sendChunks = (totalChunks > 1);
        ChunkMessageIdListPtr chunkMessageIdList;
        if (sendChunks) {
            msgMetadata.set_uuid(producerName_ + "-" + std::to_string(sequenceId));
            msgMetadata.set_num_chunks_from_msg(totalChunks);
            msgMetadata.set_total_chunk_msg_size(compressedSize);
            chunkMessageIdList = std::make_shared<std::vector<MessageId>>();
        }

        int beginIndex = 0;
        for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
            if (sendChunks) {
                msgMetadata.set_chunk_id(chunkId);
            }
            const uint32_t endIndex = std::min(compressedSize, beginIndex + payloadChunkSize);
            auto chunkedPayload = payload.slice(beginIndex, endIndex - beginIndex);
            beginIndex = endIndex;

            SharedBuffer encryptedPayload;
            if (!encryptMessage(msgMetadata, chunkedPayload, encryptedPayload)) {
                handleFailedResult(ResultCryptoError);
                return;
            }

            auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize, conf_.getSendTimeout(),
                                        (chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageIdList,
                                        producerId_, encryptedPayload);

            if (!chunkingEnabled_) {
                const uint32_t msgMetadataSize = op->sendArgs->metadata.ByteSizeLong();
                const uint32_t payloadSize = op->sendArgs->payload.readableBytes();
                const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize;
                if (msgHeadersAndPayloadSize > maxMessageSize) {
                    lock.unlock();
                    LOG_WARN(getName()
                             << " - compressed Message size " << msgHeadersAndPayloadSize << " cannot exceed "
                             << maxMessageSize << " bytes unless chunking is enabled");
                    handleFailedResult(ResultMessageTooBig);
                    return;
                }
            }

            sendMessage(std::move(op));
        }
    }
}