cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp (95 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "AsyncReceiveMessageCallback.h"
#include <system_error>
#include "ClientManagerImpl.h"
#include "ConsumeMessageType.h"
#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
#include "ProcessQueue.h"
#include "PushConsumerImpl.h"
#include "rocketmq/ErrorCode.h"
ROCKETMQ_NAMESPACE_BEGIN
AsyncReceiveMessageCallback::AsyncReceiveMessageCallback(std::weak_ptr<ProcessQueue> process_queue)
: process_queue_(std::move(process_queue)) {
receive_message_later_ = std::bind(
&AsyncReceiveMessageCallback::checkThrottleThenReceive, this, std::placeholders::_1);
}
void AsyncReceiveMessageCallback::onCompletion(
const std::error_code& ec, std::string& attempt_id, const ReceiveMessageResult& result) {
std::shared_ptr<ProcessQueue> process_queue = process_queue_.lock();
if (!process_queue) {
SPDLOG_INFO("Process queue has been destructed.");
return;
}
auto consumer = process_queue->getConsumer().lock();
if (!consumer) {
return;
}
if (ec == ErrorCode::TooManyRequests) {
SPDLOG_WARN("Action of receiving message is throttled. Retry after 20ms. Queue={}", process_queue->simpleName());
receiveMessageLater(std::chrono::milliseconds(20), attempt_id);
return;
}
if (ec == ErrorCode::NoContent) {
checkThrottleThenReceive("");
return;
}
if (ec) {
SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 second.", process_queue->simpleName(),
ec.message());
receiveMessageLater(std::chrono::seconds(1), attempt_id);
return;
}
SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=FOUND, msgListSize={}, queue={}",
result.source_host, result.messages.size(), process_queue->simpleName());
process_queue->accountCache(result.messages);
consumer->getConsumeMessageService()->dispatch(process_queue, result.messages);
checkThrottleThenReceive("");
}
const char* AsyncReceiveMessageCallback::RECEIVE_LATER_TASK_NAME = "receive-later-task";
void AsyncReceiveMessageCallback::checkThrottleThenReceive(std::string attempt_id) {
auto process_queue = process_queue_.lock();
if (!process_queue) {
SPDLOG_WARN("Process queue should have been destructed");
return;
}
if (process_queue->shouldThrottle()) {
SPDLOG_INFO("Number of messages in {} exceeds throttle threshold. Receive messages later.",
process_queue->simpleName());
process_queue->syncIdleState();
receiveMessageLater(std::chrono::seconds(1), attempt_id);
} else {
// Receive message immediately
receiveMessageImmediately(attempt_id);
}
}
void AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds delay, std::string& attempt_id) {
auto process_queue = process_queue_.lock();
if (!process_queue) {
return;
}
auto client_instance = process_queue->getClientManager();
std::weak_ptr<AsyncReceiveMessageCallback> receive_callback_weak_ptr(shared_from_this());
auto task = [receive_callback_weak_ptr, &attempt_id]() {
auto async_receive_ptr = receive_callback_weak_ptr.lock();
if (async_receive_ptr) {
async_receive_ptr->checkThrottleThenReceive(attempt_id);
}
};
client_instance->getScheduler()->schedule(
task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0));
}
void AsyncReceiveMessageCallback::receiveMessageImmediately(std::string& attempt_id) {
auto process_queue_shared_ptr = process_queue_.lock();
if (!process_queue_shared_ptr) {
SPDLOG_INFO("ProcessQueue has been released. Ignore further receive message request-response cycles");
return;
}
std::shared_ptr<PushConsumerImpl> impl = process_queue_shared_ptr->getConsumer().lock();
if (!impl) {
SPDLOG_INFO("Owner of ProcessQueue[{}] has been released. Ignore further receive message request-response cycles",
process_queue_shared_ptr->simpleName());
return;
}
impl->receiveMessage(process_queue_shared_ptr->messageQueue(),
process_queue_shared_ptr->getFilterExpression(), attempt_id);
}
ROCKETMQ_NAMESPACE_END