cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp (115 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "ConsumeMessageServiceImpl.h" #include <atomic> #include "ConsumeStats.h" #include "ConsumeTask.h" #include "PushConsumerImpl.h" #include "Tag.h" #include "ThreadPoolImpl.h" #include "rocketmq/ErrorCode.h" #include "rocketmq/Logger.h" #include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN ConsumeMessageServiceImpl::ConsumeMessageServiceImpl(std::weak_ptr<PushConsumerImpl> consumer, int thread_count, MessageListener message_listener) : state_(State::CREATED), thread_count_(thread_count), pool_(absl::make_unique<ThreadPoolImpl>(thread_count_)), consumer_(std::move(consumer)), message_listener_(message_listener) { } void ConsumeMessageServiceImpl::start() { State expected = State::CREATED; if (state_.compare_exchange_strong(expected, State::STARTING, std::memory_order_relaxed)) { pool_->start(); state_.store(State::STARTED, std::memory_order_relaxed); } } void ConsumeMessageServiceImpl::shutdown() { State expected = State::STARTED; if (state_.compare_exchange_strong(expected, State::STOPPING, std::memory_order_relaxed)) { pool_->shutdown(); state_.store(State::STOPPED, std::memory_order_relaxed); } } State ConsumeMessageServiceImpl::state() const { return state_.load(std::memory_order_relaxed); } void ConsumeMessageServiceImpl::dispatch(std::shared_ptr<ProcessQueue> process_queue, std::vector<MessageConstSharedPtr> messages) { auto consumer = consumer_.lock(); if (!consumer) { return; } if (consumer->config().subscriber.fifo) { auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(), process_queue, std::move(messages)); pool_->submit([consume_task]() { consume_task->process(); }); return; } for (auto message : messages) { auto consume_task = std::make_shared<ConsumeTask>(shared_from_this(), process_queue, message); pool_->submit([consume_task]() { consume_task->process(); }); } } void ConsumeMessageServiceImpl::submit(std::shared_ptr<ConsumeTask> task) { auto consumer = consumer_.lock(); if (!consumer) { return; } pool_->submit([task]() { task->process(); }); } void ConsumeMessageServiceImpl::ack(const Message& message, std::function<void(const std::error_code&)> cb) { auto consumer = consumer_.lock(); if (!consumer) { return; } std::weak_ptr<PushConsumerImpl> client(consumer_); const auto& topic = message.topic(); const auto& message_id = message.id(); const auto& receipt_handle = message.extension().receipt_handle; auto callback = [cb, client, topic, message_id, receipt_handle](const std::error_code& ec) { auto consumer = client.lock(); // If the receipt_handle was already expired, it is safe to treat it as success. if (ec == ErrorCode::InvalidReceiptHandle) { SPDLOG_WARN("Broker complained bad receipt handle on ack message[MsgId={}, ReceiptHandle={}]", message_id, receipt_handle); cb(ErrorCode::Success); return; } cb(ec); }; consumer->ack(message, callback); } void ConsumeMessageServiceImpl::nack(const Message& message, std::function<void(const std::error_code&)> cb) { auto consumer = consumer_.lock(); if (!consumer) { return; } consumer->nack(message, cb); } void ConsumeMessageServiceImpl::forward(const Message& message, std::function<void(const std::error_code&)> cb) { auto consumer = consumer_.lock(); if (!consumer) { return; } consumer->forwardToDeadLetterQueue(message, cb); } void ConsumeMessageServiceImpl::schedule(std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds delay) { } std::size_t ConsumeMessageServiceImpl::maxDeliveryAttempt() { std::shared_ptr<PushConsumerImpl> consumer = consumer_.lock(); if (!consumer) { SPDLOG_WARN("The consumer has already destructed"); return 0; } return consumer->maxDeliveryAttempts(); } std::weak_ptr<PushConsumerImpl> ConsumeMessageServiceImpl::consumer() { return consumer_; } bool ConsumeMessageServiceImpl::preHandle(const Message& message) { return true; } bool ConsumeMessageServiceImpl::postHandle(const Message& message, ConsumeResult result) { return true; } ROCKETMQ_NAMESPACE_END