void ConsumerImpl::processPossibleToDLQ()

in lib/ConsumerImpl.cc [1670:1773]


void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) {
    auto messages = possibleSendToDeadLetterTopicMessages_.find(messageId);
    if (!messages) {
        cb(false);
        return;
    }

    // Initialize deadLetterProducer_
    if (!deadLetterProducer_) {
        std::lock_guard<std::mutex> createLock(createProducerLock_);
        if (!deadLetterProducer_) {
            deadLetterProducer_ = std::make_shared<Promise<Result, Producer>>();
            ProducerConfiguration producerConfiguration;
            producerConfiguration.setSchema(config_.getSchema());
            producerConfiguration.setBlockIfQueueFull(false);
            producerConfiguration.impl_->initialSubscriptionName =
                deadLetterPolicy_.getInitialSubscriptionName();
            ClientImplPtr client = client_.lock();
            if (client) {
                auto self = get_shared_this_ptr();
                client->createProducerAsync(
                    deadLetterPolicy_.getDeadLetterTopic(), producerConfiguration,
                    [self](Result res, Producer producer) {
                        if (res == ResultOk) {
                            self->deadLetterProducer_->setValue(producer);
                        } else {
                            LOG_ERROR("Dead letter producer create exception with topic: "
                                      << self->deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res);
                            self->deadLetterProducer_.reset();
                        }
                    });
            } else {
                LOG_WARN(getName() << "Client is destroyed and cannot create dead letter producer.");
                return;
            }
        }
    }

    for (const auto& message : messages.value()) {
        std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
        deadLetterProducer_->getFuture().addListener([weakSelf, message, messageId, cb](Result res,
                                                                                        Producer producer) {
            auto self = weakSelf.lock();
            if (!self) {
                return;
            }
            auto originMessageId = message.getMessageId();
            std::stringstream originMessageIdStr;
            originMessageIdStr << originMessageId;
            MessageBuilder msgBuilder;
            msgBuilder.setAllocatedContent(const_cast<void*>(message.getData()), message.getLength())
                .setProperties(message.getProperties())
                .setProperty(PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr.str())
                .setProperty(SYSTEM_PROPERTY_REAL_TOPIC, message.getTopicName());
            if (message.hasPartitionKey()) {
                msgBuilder.setPartitionKey(message.getPartitionKey());
            }
            if (message.hasOrderingKey()) {
                msgBuilder.setOrderingKey(message.getOrderingKey());
            }
            producer.sendAsync(msgBuilder.build(), [weakSelf, originMessageId, messageId, cb](
                                                       Result res, const MessageId& messageIdInDLQ) {
                auto self = weakSelf.lock();
                if (!self) {
                    return;
                }
                if (res == ResultOk) {
                    if (self->state_ != Ready) {
                        LOG_WARN(
                            "Send to the DLQ successfully, but consumer is not ready. ignore acknowledge : "
                            << self->state_);
                        cb(false);
                        return;
                    }
                    self->possibleSendToDeadLetterTopicMessages_.remove(messageId);
                    self->acknowledgeAsync(originMessageId, [weakSelf, originMessageId, cb](Result result) {
                        auto self = weakSelf.lock();
                        if (!self) {
                            return;
                        }
                        if (result != ResultOk) {
                            LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
                                         << self->consumerName_ << "} Failed to acknowledge the message {"
                                         << originMessageId
                                         << "} of the original topic but send to the DLQ successfully : "
                                         << result);
                            cb(false);
                        } else {
                            LOG_DEBUG("Send msg:" << originMessageId
                                                  << "to DLQ success and acknowledge success.");
                            cb(true);
                        }
                    });
                } else {
                    LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
                                 << self->consumerName_ << "} Failed to send DLQ message to {"
                                 << self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id "
                                 << "{" << originMessageId << "} : " << res);
                    cb(false);
                }
            });
        });
    }
}