in lib/ConsumerImpl.cc [276:332]
void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
static bool firstTime = true;
if (result == ResultOk) {
if (firstTime) {
firstTime = false;
}
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
{
Lock lock(mutex_);
setCnx(cnx);
incomingMessages_.clear();
possibleSendToDeadLetterTopicMessages_.clear();
state_ = Ready;
backoff_.reset();
// Complicated logic since we don't have a isLocked() function for mutex
if (waitingForZeroQueueSizeMessage) {
sendFlowPermitsToBroker(cnx, 1);
}
availablePermits_ = 0;
}
LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize());
if (consumerTopicType_ == NonPartitioned || !firstTime) {
if (config_.getReceiverQueueSize() != 0) {
sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
} else if (messageListener_) {
sendFlowPermitsToBroker(cnx, 1);
}
}
consumerCreatedPromise_.setValue(get_shared_this_ptr());
} else {
if (result == ResultTimeout) {
// Creating the consumer has timed out. We need to ensure the broker closes the consumer
// in case it was indeed created, otherwise it might prevent new subscribe operation,
// since we are not closing the connection
int requestId = client_.lock()->newRequestId();
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
}
if (consumerCreatedPromise_.isComplete()) {
// Consumer had already been initially created, we need to retry connecting in any case
LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
scheduleReconnection(get_shared_this_ptr());
} else {
// Consumer was not yet created, retry to connect to broker if it's possible
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (result == ResultRetryable) {
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result));
scheduleReconnection(get_shared_this_ptr());
} else {
LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result));
consumerCreatedPromise_.setFailed(result);
state_ = Failed;
}
}
}
}