Result ConsumerImpl::handleCreateConsumer()

in lib/ConsumerImpl.cc [307:368]


Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
    Result handleResult = ResultOk;

    if (result == ResultOk) {
        LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
        {
            Lock mutexLock(mutex_);
            setCnx(cnx);
            incomingMessages_.clear();
            possibleSendToDeadLetterTopicMessages_.clear();
            state_ = Ready;
            backoff_.reset();
            if (!messageListener_ && config_.getReceiverQueueSize() == 0) {
                // Complicated logic since we don't have a isLocked() function for mutex
                if (waitingForZeroQueueSizeMessage) {
                    sendFlowPermitsToBroker(cnx, 1);
                }
                // Note that the order of lock acquisition must be mutex_ -> pendingReceiveMutex_,
                // otherwise a deadlock will occur.
                Lock pendingReceiveMutexLock(pendingReceiveMutex_);
                if (!pendingReceives_.empty()) {
                    sendFlowPermitsToBroker(cnx, pendingReceives_.size());
                }
            }
            availablePermits_ = 0;
        }

        LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize());
        if (config_.getReceiverQueueSize() != 0) {
            sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
        } else if (messageListener_) {
            sendFlowPermitsToBroker(cnx, 1);
        }
        consumerCreatedPromise_.setValue(get_shared_this_ptr());
    } else {
        if (result == ResultTimeout) {
            // Creating the consumer has timed out. We need to ensure the broker closes the consumer
            // in case it was indeed created, otherwise it might prevent new subscribe operation,
            // since we are not closing the connection
            int requestId = client_.lock()->newRequestId();
            cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
        }

        if (consumerCreatedPromise_.isComplete()) {
            // Consumer had already been initially created, we need to retry connecting in any case
            LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
            handleResult = ResultRetryable;
        } else {
            // Consumer was not yet created, retry to connect to broker if it's possible
            handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_);
            if (isResultRetryable(handleResult)) {
                LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(handleResult));
            } else {
                LOG_ERROR(getName() << "Failed to create consumer: " << strResult(handleResult));
                consumerCreatedPromise_.setFailed(handleResult);
                state_ = Failed;
            }
        }
    }

    return handleResult;
}