in lib/ConsumerImpl.cc [307:368]
Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
Result handleResult = ResultOk;
if (result == ResultOk) {
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
{
Lock mutexLock(mutex_);
setCnx(cnx);
incomingMessages_.clear();
possibleSendToDeadLetterTopicMessages_.clear();
state_ = Ready;
backoff_.reset();
if (!messageListener_ && config_.getReceiverQueueSize() == 0) {
// Complicated logic since we don't have a isLocked() function for mutex
if (waitingForZeroQueueSizeMessage) {
sendFlowPermitsToBroker(cnx, 1);
}
// Note that the order of lock acquisition must be mutex_ -> pendingReceiveMutex_,
// otherwise a deadlock will occur.
Lock pendingReceiveMutexLock(pendingReceiveMutex_);
if (!pendingReceives_.empty()) {
sendFlowPermitsToBroker(cnx, pendingReceives_.size());
}
}
availablePermits_ = 0;
}
LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize());
if (config_.getReceiverQueueSize() != 0) {
sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
} else if (messageListener_) {
sendFlowPermitsToBroker(cnx, 1);
}
consumerCreatedPromise_.setValue(get_shared_this_ptr());
} else {
if (result == ResultTimeout) {
// Creating the consumer has timed out. We need to ensure the broker closes the consumer
// in case it was indeed created, otherwise it might prevent new subscribe operation,
// since we are not closing the connection
int requestId = client_.lock()->newRequestId();
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
}
if (consumerCreatedPromise_.isComplete()) {
// Consumer had already been initially created, we need to retry connecting in any case
LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
handleResult = ResultRetryable;
} else {
// Consumer was not yet created, retry to connect to broker if it's possible
handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (isResultRetryable(handleResult)) {
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(handleResult));
} else {
LOG_ERROR(getName() << "Failed to create consumer: " << strResult(handleResult));
consumerCreatedPromise_.setFailed(handleResult);
state_ = Failed;
}
}
}
return handleResult;
}