in lib/BatchMessageContainerBase.cc [40:89]
Result BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg,
const FlushCallback& flushCallback,
const MessageAndCallbackBatch& batch) const {
opSendMsg.sendCallback_ = batch.createSendCallback();
opSendMsg.messagesCount_ = batch.messagesCount();
opSendMsg.messagesSize_ = batch.messagesSize();
if (flushCallback) {
auto sendCallback = opSendMsg.sendCallback_;
opSendMsg.sendCallback_ = [sendCallback, flushCallback](Result result, const MessageId& id) {
sendCallback(result, id);
flushCallback(result);
};
}
if (batch.empty()) {
return ResultOperationNotSupported;
}
MessageImplPtr impl = batch.msgImpl();
impl->metadata.set_num_messages_in_batch(batch.size());
auto compressionType = producerConfig_.getCompressionType();
if (compressionType != CompressionNone) {
impl->metadata.set_compression(static_cast<proto::CompressionType>(compressionType));
impl->metadata.set_uncompressed_size(impl->payload.readableBytes());
}
impl->payload = CompressionCodecProvider::getCodec(compressionType).encode(impl->payload);
auto msgCrypto = msgCryptoWeakPtr_.lock();
if (msgCrypto && producerConfig_.isEncryptionEnabled()) {
SharedBuffer encryptedPayload;
if (!msgCrypto->encrypt(producerConfig_.getEncryptionKeys(), producerConfig_.getCryptoKeyReader(),
impl->metadata, impl->payload, encryptedPayload)) {
return ResultCryptoError;
}
impl->payload = encryptedPayload;
}
if (impl->payload.readableBytes() > ClientConnection::getMaxMessageSize()) {
return ResultMessageTooBig;
}
opSendMsg.metadata_ = impl->metadata;
opSendMsg.payload_ = impl->payload;
opSendMsg.sequenceId_ = impl->metadata.sequence_id();
opSendMsg.producerId_ = producerId_;
opSendMsg.timeout_ = TimeUtils::now() + milliseconds(producerConfig_.getSendTimeout());
return ResultOk;
}