void ProducerImpl::sendAsyncWithStatsUpdate()

in lib/ProducerImpl.cc [465:626]


void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallback& callback) {
    if (!isValidProducerState(callback)) {
        return;
    }

    // Convert the payload before sending the message.
    msg.impl_->convertKeyValueToPayload(conf_.getSchema());
    const auto& uncompressedPayload = msg.impl_->payload;
    const uint32_t uncompressedSize = uncompressedPayload.readableBytes();
    const auto result = canEnqueueRequest(uncompressedSize);
    if (result != ResultOk) {
        // If queue is full sending the batch immediately, no point waiting till batchMessagetimeout
        if (batchMessageContainer_) {
            LOG_DEBUG(getName() << " - sending batch message immediately");
            Lock lock(mutex_);
            auto failures = batchMessageAndSend();
            lock.unlock();
            failures.complete();
        }

        callback(result, {});
        return;
    }

    // We have already reserved a spot, so if we need to early return for failed result, we should release the
    // semaphore and memory first.
    const auto handleFailedResult = [this, uncompressedSize, callback](Result result) {
        releaseSemaphore(uncompressedSize);  // it releases the memory as well
        callback(result, {});
    };

    auto& msgMetadata = msg.impl_->metadata;
    const bool compressed = !canAddToBatch(msg);
    const auto payload =
        compressed ? applyCompression(uncompressedPayload, conf_.getCompressionType()) : uncompressedPayload;
    const auto compressedSize = static_cast<uint32_t>(payload.readableBytes());
    const auto maxMessageSize = static_cast<uint32_t>(ClientConnection::getMaxMessageSize());

    if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name()) {
        handleFailedResult(ResultInvalidMessage);
        return;
    }

    Lock lock(mutex_);
    uint64_t sequenceId;
    if (!msgMetadata.has_sequence_id()) {
        sequenceId = msgSequenceGenerator_++;
    } else {
        sequenceId = msgMetadata.sequence_id();
    }
    setMessageMetadata(msg, sequenceId, uncompressedSize);

    auto payloadChunkSize = maxMessageSize;
    int totalChunks;
    if (!compressed || !chunkingEnabled_) {
        totalChunks = 1;
    } else {
        const auto metadataSize = static_cast<uint32_t>(msgMetadata.ByteSizeLong());
        if (metadataSize >= maxMessageSize) {
            LOG_WARN(getName() << " - metadata size " << metadataSize << " cannot exceed " << maxMessageSize
                               << " bytes");
            handleFailedResult(ResultMessageTooBig);
            return;
        }
        payloadChunkSize = maxMessageSize - metadataSize;
        totalChunks = getNumOfChunks(compressedSize, payloadChunkSize);
    }

    // Each chunk should be sent individually, so try to acquire extra permits for chunks.
    for (int i = 0; i < (totalChunks - 1); i++) {
        const auto result = canEnqueueRequest(0);  // size is 0 because the memory has already reserved
        if (result != ResultOk) {
            handleFailedResult(result);
            return;
        }
    }

    if (canAddToBatch(msg)) {
        // Batching is enabled and the message is not delayed
        if (!batchMessageContainer_->hasEnoughSpace(msg)) {
            batchMessageAndSend().complete();
        }
        bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
        bool isFull = batchMessageContainer_->add(msg, callback);
        if (isFirstMessage) {
            batchTimer_->expires_from_now(
                boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
            auto weakSelf = weak_from_this();
            batchTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) {
                auto self = weakSelf.lock();
                if (!self) {
                    return;
                }
                if (ec) {
                    LOG_DEBUG(getName() << " Ignoring timer cancelled event, code[" << ec << "]");
                    return;
                }
                LOG_DEBUG(getName() << " - Batch Message Timer expired");

                // ignore if the producer is already closing/closed
                const auto state = state_.load();
                if (state == Pending || state == Ready) {
                    Lock lock(mutex_);
                    auto failures = batchMessageAndSend();
                    lock.unlock();
                    failures.complete();
                }
            });
        }

        if (isFull) {
            auto failures = batchMessageAndSend();
            lock.unlock();
            failures.complete();
        }
    } else {
        const bool sendChunks = (totalChunks > 1);
        if (sendChunks) {
            msgMetadata.set_uuid(producerName_ + "-" + std::to_string(sequenceId));
            msgMetadata.set_num_chunks_from_msg(totalChunks);
            msgMetadata.set_total_chunk_msg_size(compressedSize);
        }

        auto chunkMessageId = totalChunks > 1 ? std::make_shared<ChunkMessageIdImpl>() : nullptr;

        int beginIndex = 0;
        for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
            if (sendChunks) {
                msgMetadata.set_chunk_id(chunkId);
            }
            const uint32_t endIndex = std::min(compressedSize, beginIndex + payloadChunkSize);
            auto chunkedPayload = payload.slice(beginIndex, endIndex - beginIndex);
            beginIndex = endIndex;

            SharedBuffer encryptedPayload;
            if (!encryptMessage(msgMetadata, chunkedPayload, encryptedPayload)) {
                handleFailedResult(ResultCryptoError);
                return;
            }
            OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr,
                         producerId_, sequenceId,       conf_.getSendTimeout(),
                         1,           uncompressedSize, chunkMessageId};

            if (!chunkingEnabled_) {
                const uint32_t msgMetadataSize = op.metadata_.ByteSize();
                const uint32_t payloadSize = op.payload_.readableBytes();
                const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize;
                if (msgHeadersAndPayloadSize > maxMessageSize) {
                    lock.unlock();
                    releaseSemaphoreForSendOp(op);
                    LOG_WARN(getName()
                             << " - compressed Message size " << msgHeadersAndPayloadSize << " cannot exceed "
                             << maxMessageSize << " bytes unless chunking is enabled");
                    handleFailedResult(ResultMessageTooBig);
                    return;
                }
            }

            sendMessage(op);
        }
    }
}