in lib/PartitionedProducerImpl.cc [201:245]
void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
if (state_ != Ready) {
if (callback) {
callback(ResultAlreadyClosed, msg.getMessageId());
}
return;
}
// get partition for this message from router policy
Lock producersLock(producersMutex_);
short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_));
if (partition >= getNumPartitions() || partition >= producers_.size()) {
LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition);
// change me: abort or notify failure in callback?
// change to appropriate error if callback
if (callback) {
callback(ResultUnknownError, msg.getMessageId());
}
return;
}
// find a producer for that partition, index should start from 0
ProducerImplPtr producer = producers_[partition];
// if the producer is not started (lazy producer), then kick-off the start process
if (!producer->isStarted()) {
producer->start();
}
producersLock.unlock();
// send message on that partition
if (!conf_.getLazyStartPartitionedProducers() || producer->ready()) {
producer->sendAsync(msg, std::move(callback));
} else {
// Wrapping the callback into a lambda has overhead, so we check if the producer is ready first
producer->getProducerCreatedFuture().addListener(
[msg, callback](Result result, ProducerImplBaseWeakPtr weakProducer) {
if (result == ResultOk) {
weakProducer.lock()->sendAsync(msg, std::move(callback));
} else if (callback) {
callback(result, {});
}
});
}
}