in lib/ConsumerImpl.cc [1670:1773]
void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) {
auto messages = possibleSendToDeadLetterTopicMessages_.find(messageId);
if (!messages) {
cb(false);
return;
}
// Initialize deadLetterProducer_
if (!deadLetterProducer_) {
std::lock_guard<std::mutex> createLock(createProducerLock_);
if (!deadLetterProducer_) {
deadLetterProducer_ = std::make_shared<Promise<Result, Producer>>();
ProducerConfiguration producerConfiguration;
producerConfiguration.setSchema(config_.getSchema());
producerConfiguration.setBlockIfQueueFull(false);
producerConfiguration.impl_->initialSubscriptionName =
deadLetterPolicy_.getInitialSubscriptionName();
ClientImplPtr client = client_.lock();
if (client) {
auto self = get_shared_this_ptr();
client->createProducerAsync(
deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration,
[self](Result res, Producer producer) {
if (res == ResultOk) {
self->deadLetterProducer_->setValue(producer);
} else {
LOG_ERROR("Dead letter producer create exception with topic: "
<< self->deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res);
self->deadLetterProducer_.reset();
}
});
} else {
LOG_WARN(getName() << "Client is destroyed and cannot create dead letter producer.");
return;
}
}
}
for (const auto& message : messages.value()) {
std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
deadLetterProducer_->getFuture().addListener([weakSelf, message, messageId, cb](Result res,
Producer producer) {
auto self = weakSelf.lock();
if (!self) {
return;
}
auto originMessageId = message.getMessageId();
std::stringstream originMessageIdStr;
originMessageIdStr << originMessageId;
MessageBuilder msgBuilder;
msgBuilder.setAllocatedContent(const_cast<void*>(message.getData()), message.getLength())
.setProperties(message.getProperties())
.setProperty(PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr.str())
.setProperty(SYSTEM_PROPERTY_REAL_TOPIC, message.getTopicName());
if (message.hasPartitionKey()) {
msgBuilder.setPartitionKey(message.getPartitionKey());
}
if (message.hasOrderingKey()) {
msgBuilder.setOrderingKey(message.getOrderingKey());
}
producer.sendAsync(msgBuilder.build(), [weakSelf, originMessageId, messageId, cb](
Result res, const MessageId& messageIdInDLQ) {
auto self = weakSelf.lock();
if (!self) {
return;
}
if (res == ResultOk) {
if (self->state_ != Ready) {
LOG_WARN(
"Send to the DLQ successfully, but consumer is not ready. ignore acknowledge : "
<< self->state_);
cb(false);
return;
}
self->possibleSendToDeadLetterTopicMessages_.remove(messageId);
self->acknowledgeAsync(originMessageId, [weakSelf, originMessageId, cb](Result result) {
auto self = weakSelf.lock();
if (!self) {
return;
}
if (result != ResultOk) {
LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
<< self->consumerName_ << "} Failed to acknowledge the message {"
<< originMessageId
<< "} of the original topic but send to the DLQ successfully : "
<< result);
cb(false);
} else {
LOG_DEBUG("Send msg:" << originMessageId
<< "to DLQ success and acknowledge success.");
cb(true);
}
});
} else {
LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
<< self->consumerName_ << "} Failed to send DLQ message to {"
<< self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id "
<< "{" << originMessageId << "} : " << res);
cb(false);
}
});
});
}
}