in lib/ProducerImpl.cc [465:626]
void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallback& callback) {
if (!isValidProducerState(callback)) {
return;
}
// Convert the payload before sending the message.
msg.impl_->convertKeyValueToPayload(conf_.getSchema());
const auto& uncompressedPayload = msg.impl_->payload;
const uint32_t uncompressedSize = uncompressedPayload.readableBytes();
const auto result = canEnqueueRequest(uncompressedSize);
if (result != ResultOk) {
// If queue is full sending the batch immediately, no point waiting till batchMessagetimeout
if (batchMessageContainer_) {
LOG_DEBUG(getName() << " - sending batch message immediately");
Lock lock(mutex_);
auto failures = batchMessageAndSend();
lock.unlock();
failures.complete();
}
callback(result, {});
return;
}
// We have already reserved a spot, so if we need to early return for failed result, we should release the
// semaphore and memory first.
const auto handleFailedResult = [this, uncompressedSize, callback](Result result) {
releaseSemaphore(uncompressedSize); // it releases the memory as well
callback(result, {});
};
auto& msgMetadata = msg.impl_->metadata;
const bool compressed = !canAddToBatch(msg);
const auto payload =
compressed ? applyCompression(uncompressedPayload, conf_.getCompressionType()) : uncompressedPayload;
const auto compressedSize = static_cast<uint32_t>(payload.readableBytes());
const auto maxMessageSize = static_cast<uint32_t>(ClientConnection::getMaxMessageSize());
if (!msgMetadata.has_replicated_from() && msgMetadata.has_producer_name()) {
handleFailedResult(ResultInvalidMessage);
return;
}
Lock lock(mutex_);
uint64_t sequenceId;
if (!msgMetadata.has_sequence_id()) {
sequenceId = msgSequenceGenerator_++;
} else {
sequenceId = msgMetadata.sequence_id();
}
setMessageMetadata(msg, sequenceId, uncompressedSize);
auto payloadChunkSize = maxMessageSize;
int totalChunks;
if (!compressed || !chunkingEnabled_) {
totalChunks = 1;
} else {
const auto metadataSize = static_cast<uint32_t>(msgMetadata.ByteSizeLong());
if (metadataSize >= maxMessageSize) {
LOG_WARN(getName() << " - metadata size " << metadataSize << " cannot exceed " << maxMessageSize
<< " bytes");
handleFailedResult(ResultMessageTooBig);
return;
}
payloadChunkSize = maxMessageSize - metadataSize;
totalChunks = getNumOfChunks(compressedSize, payloadChunkSize);
}
// Each chunk should be sent individually, so try to acquire extra permits for chunks.
for (int i = 0; i < (totalChunks - 1); i++) {
const auto result = canEnqueueRequest(0); // size is 0 because the memory has already reserved
if (result != ResultOk) {
handleFailedResult(result);
return;
}
}
if (canAddToBatch(msg)) {
// Batching is enabled and the message is not delayed
if (!batchMessageContainer_->hasEnoughSpace(msg)) {
batchMessageAndSend().complete();
}
bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
bool isFull = batchMessageContainer_->add(msg, callback);
if (isFirstMessage) {
batchTimer_->expires_from_now(
boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
auto weakSelf = weak_from_this();
batchTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
}
if (ec) {
LOG_DEBUG(getName() << " Ignoring timer cancelled event, code[" << ec << "]");
return;
}
LOG_DEBUG(getName() << " - Batch Message Timer expired");
// ignore if the producer is already closing/closed
const auto state = state_.load();
if (state == Pending || state == Ready) {
Lock lock(mutex_);
auto failures = batchMessageAndSend();
lock.unlock();
failures.complete();
}
});
}
if (isFull) {
auto failures = batchMessageAndSend();
lock.unlock();
failures.complete();
}
} else {
const bool sendChunks = (totalChunks > 1);
if (sendChunks) {
msgMetadata.set_uuid(producerName_ + "-" + std::to_string(sequenceId));
msgMetadata.set_num_chunks_from_msg(totalChunks);
msgMetadata.set_total_chunk_msg_size(compressedSize);
}
auto chunkMessageId = totalChunks > 1 ? std::make_shared<ChunkMessageIdImpl>() : nullptr;
int beginIndex = 0;
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
if (sendChunks) {
msgMetadata.set_chunk_id(chunkId);
}
const uint32_t endIndex = std::min(compressedSize, beginIndex + payloadChunkSize);
auto chunkedPayload = payload.slice(beginIndex, endIndex - beginIndex);
beginIndex = endIndex;
SharedBuffer encryptedPayload;
if (!encryptMessage(msgMetadata, chunkedPayload, encryptedPayload)) {
handleFailedResult(ResultCryptoError);
return;
}
OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr,
producerId_, sequenceId, conf_.getSendTimeout(),
1, uncompressedSize, chunkMessageId};
if (!chunkingEnabled_) {
const uint32_t msgMetadataSize = op.metadata_.ByteSize();
const uint32_t payloadSize = op.payload_.readableBytes();
const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize;
if (msgHeadersAndPayloadSize > maxMessageSize) {
lock.unlock();
releaseSemaphoreForSendOp(op);
LOG_WARN(getName()
<< " - compressed Message size " << msgHeadersAndPayloadSize << " cannot exceed "
<< maxMessageSize << " bytes unless chunking is enabled");
handleFailedResult(ResultMessageTooBig);
return;
}
}
sendMessage(op);
}
}
}