in lib/ProducerImpl.cc [384:421]
void ProducerImpl::flushAsync(FlushCallback callback) {
if (state_ != Ready) {
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}
auto addCallbackToLastOp = [this, &callback] {
if (pendingMessagesQueue_.empty()) {
return false;
}
pendingMessagesQueue_.back()->addTrackerCallback(callback);
return true;
};
if (batchMessageContainer_) {
Lock lock(mutex_);
if (batchMessageContainer_->isEmpty()) {
if (!addCallbackToLastOp() && callback) {
lock.unlock();
callback(ResultOk);
}
return;
}
auto failures = batchMessageAndSend(callback);
lock.unlock();
failures.complete();
} else {
Lock lock(mutex_);
if (!addCallbackToLastOp() && callback) {
lock.unlock();
callback(ResultOk);
}
}
}