in lib/ProducerImpl.cc [189:310]
Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData) {
Result handleResult = ResultOk;
Lock lock(mutex_);
LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));
// make sure we're still in the Pending/Ready state, closeAsync could have been invoked
// while waiting for this response if using lazy producers
const auto state = state_.load();
if (state != Ready && state != Pending) {
LOG_DEBUG("Producer created response received but producer already closed");
failPendingMessages(ResultAlreadyClosed, false);
if (result == ResultOk || result == ResultTimeout) {
auto client = client_.lock();
if (client) {
int requestId = client->newRequestId();
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
}
}
if (!producerCreatedPromise_.isComplete()) {
lock.unlock();
producerCreatedPromise_.setFailed(ResultAlreadyClosed);
}
return ResultAlreadyClosed;
}
if (result == ResultOk) {
// We are now reconnected to broker and clear to send messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent immediately
LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
producerName_ = responseData.producerName;
schemaVersion_ = responseData.schemaVersion;
producerStr_ = "[" + topic() + ", " + producerName_ + "] ";
topicEpoch = responseData.topicEpoch;
if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
lastSequenceIdPublished_ = responseData.lastSequenceId;
msgSequenceGenerator_ = lastSequenceIdPublished_ + 1;
}
resendMessages(cnx);
setCnx(cnx);
state_ = Ready;
backoff_.reset();
if (conf_.isEncryptionEnabled()) {
auto weakSelf = weak_from_this();
dataKeyRefreshTask_.setCallback([this, weakSelf](const PeriodicTask::ErrorCode& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
}
if (ec) {
LOG_ERROR("DataKeyRefresh timer failed: " << ec.message());
return;
}
msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
});
}
// if the producer is lazy the send timeout timer is already running
if (!(conf_.getLazyStartPartitionedProducers() &&
conf_.getAccessMode() == ProducerConfiguration::Shared)) {
startSendTimeoutTimer();
}
lock.unlock();
producerCreatedPromise_.setValue(shared_from_this());
} else {
// Producer creation failed
if (result == ResultTimeout) {
// Creating the producer has timed out. We need to ensure the broker closes the producer
// in case it was indeed created, otherwise it might prevent new create producer operation,
// since we are not closing the connection
auto client = client_.lock();
if (client) {
int requestId = client->newRequestId();
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
}
}
if (result == ResultProducerFenced) {
state_ = Producer_Fenced;
failPendingMessages(result, false);
auto client = client_.lock();
if (client) {
client->cleanupProducer(this);
}
lock.unlock();
producerCreatedPromise_.setFailed(result);
handleResult = result;
} else if (producerCreatedPromise_.isComplete() || retryOnCreationError_) {
if (result == ResultProducerBlockedQuotaExceededException) {
LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
failPendingMessages(ResultProducerBlockedQuotaExceededException, false);
} else if (result == ResultProducerBlockedQuotaExceededError) {
LOG_WARN(getName() << "Producer is blocked on creation because backlog is exceeded on topic");
}
// Producer had already been initially created, we need to retry connecting in any case
LOG_WARN(getName() << "Failed to reconnect producer: " << strResult(result));
handleResult = ResultRetryable;
} else {
// Producer was not yet created, retry to connect to broker if it's possible
handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (isResultRetryable(handleResult)) {
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(handleResult));
} else {
LOG_ERROR(getName() << "Failed to create producer: " << strResult(handleResult));
failPendingMessages(handleResult, false);
state_ = Failed;
lock.unlock();
producerCreatedPromise_.setFailed(handleResult);
}
}
}
return handleResult;
}