void ProducerImpl::closeAsync()

in lib/ProducerImpl.cc [726:788]


void ProducerImpl::closeAsync(CloseCallback originalCallback) {
    auto callback = [this, originalCallback](Result result) {
        if (result == ResultOk) {
            LOG_INFO(getName() << "Closed producer " << producerId_);
            shutdown();
        } else {
            LOG_ERROR(getName() << "Failed to close producer: " << strResult(result));
        }
        if (originalCallback) {
            originalCallback(result);
        }
    };

    Lock lock(mutex_);

    // if the producer was never started then there is nothing to clean up
    State expectedState = NotStarted;
    if (state_.compare_exchange_strong(expectedState, Closed)) {
        callback(ResultOk);
        return;
    }

    cancelTimers();

    if (semaphore_) {
        semaphore_->close();
    }

    // ensure any remaining send callbacks are called before calling the close callback
    failPendingMessages(ResultAlreadyClosed, false);

    // TODO  maybe we need a loop here to implement CAS for a condition,
    // just like Java's `getAndUpdate` method on an atomic variable
    const auto state = state_.load();
    if (state != Ready && state != Pending) {
        callback(ResultAlreadyClosed);

        return;
    }
    LOG_INFO(getName() << "Closing producer for topic " << topic_);
    state_ = Closing;

    ClientConnectionPtr cnx = getCnx().lock();
    if (!cnx) {
        callback(ResultOk);
        return;
    }

    // Detach the producer from the connection to avoid sending any other
    // message from the producer
    resetCnx();

    ClientImplPtr client = client_.lock();
    if (!client) {
        callback(ResultOk);
        return;
    }

    int requestId = client->newRequestId();
    auto self = shared_from_this();
    cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId)
        .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
}