Result BatchMessageContainerBase::createOpSendMsgHelper()

in lib/BatchMessageContainerBase.cc [40:89]


Result BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg,
                                                        const FlushCallback& flushCallback,
                                                        const MessageAndCallbackBatch& batch) const {
    opSendMsg.sendCallback_ = batch.createSendCallback();
    opSendMsg.messagesCount_ = batch.messagesCount();
    opSendMsg.messagesSize_ = batch.messagesSize();

    if (flushCallback) {
        auto sendCallback = opSendMsg.sendCallback_;
        opSendMsg.sendCallback_ = [sendCallback, flushCallback](Result result, const MessageId& id) {
            sendCallback(result, id);
            flushCallback(result);
        };
    }

    if (batch.empty()) {
        return ResultOperationNotSupported;
    }

    MessageImplPtr impl = batch.msgImpl();
    impl->metadata.set_num_messages_in_batch(batch.size());
    auto compressionType = producerConfig_.getCompressionType();
    if (compressionType != CompressionNone) {
        impl->metadata.set_compression(static_cast<proto::CompressionType>(compressionType));
        impl->metadata.set_uncompressed_size(impl->payload.readableBytes());
    }
    impl->payload = CompressionCodecProvider::getCodec(compressionType).encode(impl->payload);

    auto msgCrypto = msgCryptoWeakPtr_.lock();
    if (msgCrypto && producerConfig_.isEncryptionEnabled()) {
        SharedBuffer encryptedPayload;
        if (!msgCrypto->encrypt(producerConfig_.getEncryptionKeys(), producerConfig_.getCryptoKeyReader(),
                                impl->metadata, impl->payload, encryptedPayload)) {
            return ResultCryptoError;
        }
        impl->payload = encryptedPayload;
    }

    if (impl->payload.readableBytes() > ClientConnection::getMaxMessageSize()) {
        return ResultMessageTooBig;
    }

    opSendMsg.metadata_ = impl->metadata;
    opSendMsg.payload_ = impl->payload;
    opSendMsg.sequenceId_ = impl->metadata.sequence_id();
    opSendMsg.producerId_ = producerId_;
    opSendMsg.timeout_ = TimeUtils::now() + milliseconds(producerConfig_.getSendTimeout());

    return ResultOk;
}