Result ProducerImpl::handleCreateProducer()

in lib/ProducerImpl.cc [189:310]


Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
                                          const ResponseData& responseData) {
    Result handleResult = ResultOk;

    Lock lock(mutex_);

    LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));

    // make sure we're still in the Pending/Ready state, closeAsync could have been invoked
    // while waiting for this response if using lazy producers
    const auto state = state_.load();
    if (state != Ready && state != Pending) {
        LOG_DEBUG("Producer created response received but producer already closed");
        failPendingMessages(ResultAlreadyClosed, false);
        if (result == ResultOk || result == ResultTimeout) {
            auto client = client_.lock();
            if (client) {
                int requestId = client->newRequestId();
                cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
            }
        }
        if (!producerCreatedPromise_.isComplete()) {
            lock.unlock();
            producerCreatedPromise_.setFailed(ResultAlreadyClosed);
        }
        return ResultAlreadyClosed;
    }

    if (result == ResultOk) {
        // We are now reconnected to broker and clear to send messages. Re-send all pending messages and
        // set the cnx pointer so that new messages will be sent immediately
        LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());

        producerName_ = responseData.producerName;
        schemaVersion_ = responseData.schemaVersion;
        producerStr_ = "[" + topic() + ", " + producerName_ + "] ";
        topicEpoch = responseData.topicEpoch;

        if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
            lastSequenceIdPublished_ = responseData.lastSequenceId;
            msgSequenceGenerator_ = lastSequenceIdPublished_ + 1;
        }
        resendMessages(cnx);
        setCnx(cnx);
        state_ = Ready;
        backoff_.reset();

        if (conf_.isEncryptionEnabled()) {
            auto weakSelf = weak_from_this();
            dataKeyRefreshTask_.setCallback([this, weakSelf](const PeriodicTask::ErrorCode& ec) {
                auto self = weakSelf.lock();
                if (!self) {
                    return;
                }
                if (ec) {
                    LOG_ERROR("DataKeyRefresh timer failed: " << ec.message());
                    return;
                }
                msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
            });
        }

        // if the producer is lazy the send timeout timer is already running
        if (!(conf_.getLazyStartPartitionedProducers() &&
              conf_.getAccessMode() == ProducerConfiguration::Shared)) {
            startSendTimeoutTimer();
        }

        lock.unlock();
        producerCreatedPromise_.setValue(shared_from_this());

    } else {
        // Producer creation failed
        if (result == ResultTimeout) {
            // Creating the producer has timed out. We need to ensure the broker closes the producer
            // in case it was indeed created, otherwise it might prevent new create producer operation,
            // since we are not closing the connection
            auto client = client_.lock();
            if (client) {
                int requestId = client->newRequestId();
                cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
            }
        }

        if (result == ResultProducerFenced) {
            state_ = Producer_Fenced;
            failPendingMessages(result, false);
            auto client = client_.lock();
            if (client) {
                client->cleanupProducer(this);
            }
            lock.unlock();
            producerCreatedPromise_.setFailed(result);
            handleResult = result;
        } else if (producerCreatedPromise_.isComplete() || retryOnCreationError_) {
            if (result == ResultProducerBlockedQuotaExceededException) {
                LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
                failPendingMessages(ResultProducerBlockedQuotaExceededException, false);
            } else if (result == ResultProducerBlockedQuotaExceededError) {
                LOG_WARN(getName() << "Producer is blocked on creation because backlog is exceeded on topic");
            }

            // Producer had already been initially created, we need to retry connecting in any case
            LOG_WARN(getName() << "Failed to reconnect producer: " << strResult(result));
            handleResult = ResultRetryable;
        } else {
            // Producer was not yet created, retry to connect to broker if it's possible
            handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_);
            if (isResultRetryable(handleResult)) {
                LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(handleResult));
            } else {
                LOG_ERROR(getName() << "Failed to create producer: " << strResult(handleResult));
                failPendingMessages(handleResult, false);
                state_ = Failed;
                lock.unlock();
                producerCreatedPromise_.setFailed(handleResult);
            }
        }
    }

    return handleResult;
}