public void sendAsync()

in pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java [530:681]


    public void sendAsync(Message<?> message, SendCallback callback) {
        checkArgument(message instanceof MessageImpl);
        MessageImpl<?> msg = (MessageImpl<?>) message;
        MessageMetadata msgMetadata = msg.getMessageBuilder();
        ByteBuf payload = msg.getDataBuffer();
        final int uncompressedSize = payload.readableBytes();

        if (!isValidProducerState(callback, message.getSequenceId())) {
            payload.release();
            return;
        }

        if (!canEnqueueRequest(callback, message.getSequenceId(), uncompressedSize)) {
            payload.release();
            return;
        }

        // If compression is enabled, we are compressing, otherwise it will simply use the same buffer
        ByteBuf compressedPayload = payload;
        boolean compressed = false;
        // Batch will be compressed when closed
        // If a message has a delayed delivery time, we'll always send it individually
        if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {
            if (payload.readableBytes() > conf.getCompressMinMsgBodySize()) {
                compressedPayload = applyCompression(payload);
                compressed = true;

                // validate msg-size (For batching this will be check at the batch completion size)
                int compressedSize = compressedPayload.readableBytes();
                if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
                    compressedPayload.release();
                    String compressedStr = conf.getCompressionType() != CompressionType.NONE
                            ? ("compressed (" + conf.getCompressionType() + ")")
                            : "uncompressed";
                    PulsarClientException.InvalidMessageException invalidMessageException =
                            new PulsarClientException.InvalidMessageException(
                                    format("The producer %s of the topic %s sends a %s message with %d bytes that "
                                                    + "exceeds %d bytes",
                                            producerName, topic, compressedStr, compressedSize,
                                            getMaxMessageSize()));
                    completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
                    return;
                }
            }
        }

        if (!msg.isReplicated() && msgMetadata.hasProducerName()) {
            PulsarClientException.InvalidMessageException invalidMessageException =
                    new PulsarClientException.InvalidMessageException(
                            format("The producer %s of the topic %s can not reuse the same message", producerName,
                                    topic),
                            msg.getSequenceId());
            completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
            compressedPayload.release();
            return;
        }

        if (!populateMessageSchema(msg, callback)) {
            compressedPayload.release();
            return;
        }

        // Update the message metadata before computing the payload chunk size to avoid a large message cannot be split
        // into chunks.
        updateMessageMetadata(msgMetadata, uncompressedSize, compressed);

        // send in chunks
        int totalChunks;
        int payloadChunkSize;
        if (canAddToBatch(msg) || !conf.isChunkingEnabled()) {
            totalChunks = 1;
            payloadChunkSize = getMaxMessageSize();
        } else {
            // Reserve current metadata size for chunk size to avoid message size overflow.
            // NOTE: this is not strictly bounded, as metadata will be updated after chunking.
            // So there is a small chance that the final message size is larger than ClientCnx.getMaxMessageSize().
            // But it won't cause produce failure as broker have 10 KB padding space for these cases.
            payloadChunkSize = getMaxMessageSize() - msgMetadata.getSerializedSize();
            if (payloadChunkSize <= 0) {
                PulsarClientException.InvalidMessageException invalidMessageException =
                        new PulsarClientException.InvalidMessageException(
                                format("The producer %s of the topic %s sends a message with %d bytes metadata that "
                                                + "exceeds %d bytes", producerName, topic,
                                        msgMetadata.getSerializedSize(), getMaxMessageSize()));
                completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
                compressedPayload.release();
                return;
            }
            payloadChunkSize = Math.min(chunkMaxMessageSize, payloadChunkSize);
            totalChunks = MathUtils.ceilDiv(Math.max(1, compressedPayload.readableBytes()), payloadChunkSize);
        }

        // chunked message also sent individually so, try to acquire send-permits
        for (int i = 0; i < (totalChunks - 1); i++) {
            if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(),
                    0 /* The memory was already reserved */)) {
                compressedPayload.release();
                client.getMemoryLimitController().releaseMemory(uncompressedSize);
                semaphoreRelease(i + 1);
                return;
            }
        }

        try {
            int readStartIndex = 0;
            ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ? ChunkedMessageCtx.get(totalChunks) : null;
            byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion()
                    ? msg.getMessageBuilder().getSchemaVersion() : null;
            byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey()
                    ? msg.getMessageBuilder().getOrderingKey() : null;
            // msg.messageId will be reset if previous message chunk is sent successfully.
            final MessageId messageId = msg.getMessageId();
            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
                // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in
                // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`,
                // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to
                // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`.
                if (chunkId > 0) {
                    if (schemaVersion != null) {
                        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
                    }
                    if (orderingKey != null) {
                        msg.getMessageBuilder().setOrderingKey(orderingKey);
                    }
                }
                if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
                        message.getSequenceId(), 0 /* The memory was already reserved */)) {
                    compressedPayload.release();
                    client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
                    semaphoreRelease(totalChunks - chunkId);
                    return;
                }
                synchronized (this) {
                    // Update the message metadata before computing the payload chunk size
                    // to avoid a large message cannot be split into chunks.
                    final long sequenceId = updateMessageMetadataSequenceId(msgMetadata);
                    String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;

                    serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
                            readStartIndex, payloadChunkSize, compressedPayload, compressed,
                            compressedPayload.readableBytes(), callback, chunkedMessageCtx, messageId);
                    readStartIndex = ((chunkId + 1) * payloadChunkSize);
                }
            }
        } catch (PulsarClientException e) {
            e.setSequenceId(msg.getSequenceId());
            completeCallbackAndReleaseSemaphore(uncompressedSize, callback, e);
        } catch (Throwable t) {
            completeCallbackAndReleaseSemaphore(uncompressedSize, callback,
                    new PulsarClientException(t, msg.getSequenceId()));
        }
    }