in lib/ProducerImpl.cc [726:788]
void ProducerImpl::closeAsync(CloseCallback originalCallback) {
auto callback = [this, originalCallback](Result result) {
if (result == ResultOk) {
LOG_INFO(getName() << "Closed producer " << producerId_);
shutdown();
} else {
LOG_ERROR(getName() << "Failed to close producer: " << strResult(result));
}
if (originalCallback) {
originalCallback(result);
}
};
Lock lock(mutex_);
// if the producer was never started then there is nothing to clean up
State expectedState = NotStarted;
if (state_.compare_exchange_strong(expectedState, Closed)) {
callback(ResultOk);
return;
}
cancelTimers();
if (semaphore_) {
semaphore_->close();
}
// ensure any remaining send callbacks are called before calling the close callback
failPendingMessages(ResultAlreadyClosed, false);
// TODO maybe we need a loop here to implement CAS for a condition,
// just like Java's `getAndUpdate` method on an atomic variable
const auto state = state_.load();
if (state != Ready && state != Pending) {
callback(ResultAlreadyClosed);
return;
}
LOG_INFO(getName() << "Closing producer for topic " << topic_);
state_ = Closing;
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
callback(ResultOk);
return;
}
// Detach the producer from the connection to avoid sending any other
// message from the producer
resetCnx();
ClientImplPtr client = client_.lock();
if (!client) {
callback(ResultOk);
return;
}
int requestId = client->newRequestId();
auto self = shared_from_this();
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId)
.addListener([self, callback](Result result, const ResponseData&) { callback(result); });
}