void ProducerImpl::flushAsync()

in lib/ProducerImpl.cc [384:421]


void ProducerImpl::flushAsync(FlushCallback callback) {
    if (state_ != Ready) {
        if (callback) {
            callback(ResultAlreadyClosed);
        }
        return;
    }

    auto addCallbackToLastOp = [this, &callback] {
        if (pendingMessagesQueue_.empty()) {
            return false;
        }
        pendingMessagesQueue_.back()->addTrackerCallback(callback);
        return true;
    };

    if (batchMessageContainer_) {
        Lock lock(mutex_);

        if (batchMessageContainer_->isEmpty()) {
            if (!addCallbackToLastOp() && callback) {
                lock.unlock();
                callback(ResultOk);
            }
            return;
        }

        auto failures = batchMessageAndSend(callback);
        lock.unlock();
        failures.complete();
    } else {
        Lock lock(mutex_);
        if (!addCallbackToLastOp() && callback) {
            lock.unlock();
            callback(ResultOk);
        }
    }
}