void PartitionedProducerImpl::sendAsync()

in lib/PartitionedProducerImpl.cc [201:245]


void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
    if (state_ != Ready) {
        if (callback) {
            callback(ResultAlreadyClosed, msg.getMessageId());
        }
        return;
    }

    // get partition for this message from router policy
    Lock producersLock(producersMutex_);
    short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_));
    if (partition >= getNumPartitions() || partition >= producers_.size()) {
        LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition);
        // change me: abort or notify failure in callback?
        //          change to appropriate error if callback
        if (callback) {
            callback(ResultUnknownError, msg.getMessageId());
        }
        return;
    }
    // find a producer for that partition, index should start from 0
    ProducerImplPtr producer = producers_[partition];

    // if the producer is not started (lazy producer), then kick-off the start process
    if (!producer->isStarted()) {
        producer->start();
    }

    producersLock.unlock();

    // send message on that partition
    if (!conf_.getLazyStartPartitionedProducers() || producer->ready()) {
        producer->sendAsync(msg, std::move(callback));
    } else {
        // Wrapping the callback into a lambda has overhead, so we check if the producer is ready first
        producer->getProducerCreatedFuture().addListener(
            [msg, callback](Result result, ProducerImplBaseWeakPtr weakProducer) {
                if (result == ResultOk) {
                    weakProducer.lock()->sendAsync(msg, std::move(callback));
                } else if (callback) {
                    callback(result, {});
                }
            });
    }
}