in pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java [530:681]
public void sendAsync(Message<?> message, SendCallback callback) {
checkArgument(message instanceof MessageImpl);
MessageImpl<?> msg = (MessageImpl<?>) message;
MessageMetadata msgMetadata = msg.getMessageBuilder();
ByteBuf payload = msg.getDataBuffer();
final int uncompressedSize = payload.readableBytes();
if (!isValidProducerState(callback, message.getSequenceId())) {
payload.release();
return;
}
if (!canEnqueueRequest(callback, message.getSequenceId(), uncompressedSize)) {
payload.release();
return;
}
// If compression is enabled, we are compressing, otherwise it will simply use the same buffer
ByteBuf compressedPayload = payload;
boolean compressed = false;
// Batch will be compressed when closed
// If a message has a delayed delivery time, we'll always send it individually
if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {
if (payload.readableBytes() > conf.getCompressMinMsgBodySize()) {
compressedPayload = applyCompression(payload);
compressed = true;
// validate msg-size (For batching this will be check at the batch completion size)
int compressedSize = compressedPayload.readableBytes();
if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
compressedPayload.release();
String compressedStr = conf.getCompressionType() != CompressionType.NONE
? ("compressed (" + conf.getCompressionType() + ")")
: "uncompressed";
PulsarClientException.InvalidMessageException invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a %s message with %d bytes that "
+ "exceeds %d bytes",
producerName, topic, compressedStr, compressedSize,
getMaxMessageSize()));
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
return;
}
}
}
if (!msg.isReplicated() && msgMetadata.hasProducerName()) {
PulsarClientException.InvalidMessageException invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s can not reuse the same message", producerName,
topic),
msg.getSequenceId());
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
compressedPayload.release();
return;
}
if (!populateMessageSchema(msg, callback)) {
compressedPayload.release();
return;
}
// Update the message metadata before computing the payload chunk size to avoid a large message cannot be split
// into chunks.
updateMessageMetadata(msgMetadata, uncompressedSize, compressed);
// send in chunks
int totalChunks;
int payloadChunkSize;
if (canAddToBatch(msg) || !conf.isChunkingEnabled()) {
totalChunks = 1;
payloadChunkSize = getMaxMessageSize();
} else {
// Reserve current metadata size for chunk size to avoid message size overflow.
// NOTE: this is not strictly bounded, as metadata will be updated after chunking.
// So there is a small chance that the final message size is larger than ClientCnx.getMaxMessageSize().
// But it won't cause produce failure as broker have 10 KB padding space for these cases.
payloadChunkSize = getMaxMessageSize() - msgMetadata.getSerializedSize();
if (payloadChunkSize <= 0) {
PulsarClientException.InvalidMessageException invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a message with %d bytes metadata that "
+ "exceeds %d bytes", producerName, topic,
msgMetadata.getSerializedSize(), getMaxMessageSize()));
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
compressedPayload.release();
return;
}
payloadChunkSize = Math.min(chunkMaxMessageSize, payloadChunkSize);
totalChunks = MathUtils.ceilDiv(Math.max(1, compressedPayload.readableBytes()), payloadChunkSize);
}
// chunked message also sent individually so, try to acquire send-permits
for (int i = 0; i < (totalChunks - 1); i++) {
if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
0 /* The memory was already reserved */)) {
compressedPayload.release();
client.getMemoryLimitController().releaseMemory(uncompressedSize);
semaphoreRelease(i + 1);
return;
}
}
try {
int readStartIndex = 0;
ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
? msg.getMessageBuilder().getSchemaVersion() : null;
byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
? msg.getMessageBuilder().getOrderingKey() : null;
// msg.messageId will be reset if previous message chunk is sent successfully.
final MessageId messageId = msg.getMessageId();
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
// Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
// `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
// we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
// reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
if (chunkId > 0) {
if (schemaVersion != null) {
msg.getMessageBuilder().setSchemaVersion(schemaVersion);
}
if (orderingKey != null) {
msg.getMessageBuilder().setOrderingKey(orderingKey);
}
}
if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
message.getSequenceId(), 0 /* The memory was already reserved */)) {
compressedPayload.release();
client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
semaphoreRelease(totalChunks - chunkId);
return;
}
synchronized (this) {
// Update the message metadata before computing the payload chunk size
// to avoid a large message cannot be split into chunks.
final long sequenceId = updateMessageMetadataSequenceId(msgMetadata);
String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
readStartIndex, payloadChunkSize, compressedPayload, compressed,
compressedPayload.readableBytes(), callback, chunkedMessageCtx, messageId);
readStartIndex = ((chunkId + 1) * payloadChunkSize);
}
}
} catch (PulsarClientException e) {
e.setSequenceId(msg.getSequenceId());
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, e);
} catch (Throwable t) {
completeCallbackAndReleaseSemaphore(uncompressedSize, callback,
new PulsarClientException(t, msg.getSequenceId()));
}
}