cpp/source/rocketmq/include/ProcessQueueImpl.h (65 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <memory>
#include <set>
#include "ClientManager.h"
#include "MessageExt.h"
#include "MixAll.h"
#include "ProcessQueue.h"
#include "ReceiveMessageCallback.h"
#include "TopicAssignmentInfo.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "rocketmq/FilterExpression.h"
ROCKETMQ_NAMESPACE_BEGIN
class PushConsumerImpl;
/**
* @brief Once messages are fetched(either pulled or popped) from remote server, they are firstly put into cache.
* Dispatcher thread, after waking up, will submit them into thread-pool. Messages at this phase are called "inflight"
* state. Once messages are processed by user-passed-in callback, their quota will be released for future incoming
* messages.
*/
class ProcessQueueImpl : virtual public ProcessQueue {
public:
ProcessQueueImpl(rmq::MessageQueue message_queue,
FilterExpression filter_expression,
std::weak_ptr<PushConsumerImpl> consumer,
std::shared_ptr<ClientManager> client_instance);
~ProcessQueueImpl() override;
void callback(std::shared_ptr<AsyncReceiveMessageCallback> callback) override;
bool expired() const override;
bool shouldThrottle() const override;
const FilterExpression& getFilterExpression() const override;
std::weak_ptr<PushConsumerImpl> getConsumer() override;
std::shared_ptr<ClientManager> getClientManager() override;
void receiveMessage(std::string& attempt_id) override;
const std::string& simpleName() const override {
return simple_name_;
}
std::string topic() const override {
return message_queue_.topic().name();
}
std::uint64_t cachedMessageQuantity() const override;
std::uint64_t cachedMessageMemory() const override;
/**
* Put message fetched from broker into cache.
*
* @param messages
*/
void accountCache(const std::vector<MessageConstSharedPtr>& messages) override;
void syncIdleState() override {
idle_since_ = std::chrono::steady_clock::now();
}
void release(uint64_t body_size) override;
const rmq::MessageQueue& messageQueue() const override {
return message_queue_;
}
private:
rmq::MessageQueue message_queue_;
/**
* Expression used to filter message in the server side.
*/
const FilterExpression filter_expression_;
std::chrono::milliseconds invisible_time_;
std::chrono::steady_clock::time_point idle_since_{std::chrono::steady_clock::now()};
absl::Time create_timestamp_{absl::Now()};
std::string simple_name_;
std::weak_ptr<PushConsumerImpl> consumer_;
std::shared_ptr<ClientManager> client_manager_;
std::shared_ptr<AsyncReceiveMessageCallback> receive_callback_;
/**
* @brief Quantity of the cached messages.
*
*/
std::atomic<uint32_t> cached_message_quantity_;
/**
* @brief Total body memory size of the cached messages.
*
*/
std::atomic<uint64_t> cached_message_memory_;
void popMessage(std::string& attempt_id);
void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata,
rmq::ReceiveMessageRequest& request, std::string& attempt_id);
void wrapFilterExpression(rmq::FilterExpression* filter_expression);
};
ROCKETMQ_NAMESPACE_END