void ConsumerImpl::handleCreateConsumer()

in lib/ConsumerImpl.cc [276:332]


void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
    static bool firstTime = true;
    if (result == ResultOk) {
        if (firstTime) {
            firstTime = false;
        }
        LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
        {
            Lock lock(mutex_);
            setCnx(cnx);
            incomingMessages_.clear();
            possibleSendToDeadLetterTopicMessages_.clear();
            state_ = Ready;
            backoff_.reset();
            // Complicated logic since we don't have a isLocked() function for mutex
            if (waitingForZeroQueueSizeMessage) {
                sendFlowPermitsToBroker(cnx, 1);
            }
            availablePermits_ = 0;
        }

        LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize());
        if (consumerTopicType_ == NonPartitioned || !firstTime) {
            if (config_.getReceiverQueueSize() != 0) {
                sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
            } else if (messageListener_) {
                sendFlowPermitsToBroker(cnx, 1);
            }
        }
        consumerCreatedPromise_.setValue(get_shared_this_ptr());
    } else {
        if (result == ResultTimeout) {
            // Creating the consumer has timed out. We need to ensure the broker closes the consumer
            // in case it was indeed created, otherwise it might prevent new subscribe operation,
            // since we are not closing the connection
            int requestId = client_.lock()->newRequestId();
            cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
        }

        if (consumerCreatedPromise_.isComplete()) {
            // Consumer had already been initially created, we need to retry connecting in any case
            LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
            scheduleReconnection(get_shared_this_ptr());
        } else {
            // Consumer was not yet created, retry to connect to broker if it's possible
            result = convertToTimeoutIfNecessary(result, creationTimestamp_);
            if (result == ResultRetryable) {
                LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result));
                scheduleReconnection(get_shared_this_ptr());
            } else {
                LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result));
                consumerCreatedPromise_.setFailed(result);
                state_ = Failed;
            }
        }
    }
}