python/rocketmq/v5/consumer/simple_consumer.py (382 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import functools
import threading
from concurrent.futures import Future
from rocketmq.grpc_protocol import (AckMessageEntry, AckMessageRequest,
ChangeInvisibleDurationRequest, ClientType,
HeartbeatRequest,
NotifyClientTerminationRequest,
ReceiveMessageRequest, Settings,
Subscription, TelemetryCommand)
from rocketmq.v5.client import Client, ClientConfiguration
from rocketmq.v5.client.balancer import QueueSelector
from rocketmq.v5.exception import (IllegalArgumentException,
IllegalStateException)
from rocketmq.v5.log import logger
from rocketmq.v5.model import CallbackResult, FilterExpression, Message
from rocketmq.v5.util import (AtomicInteger, ConcurrentMap,
MessagingResultChecker, Misc)
class SimpleConsumer(Client):
def __init__(
self,
client_configuration: ClientConfiguration,
consumer_group,
subscription: dict = None,
await_duration=20,
):
if consumer_group is None or consumer_group.strip() == "":
raise IllegalArgumentException("consumerGroup should not be null")
if Misc.is_valid_consumer_group(consumer_group) is False:
raise IllegalArgumentException(
f"consumerGroup does not match the regex [regex={Misc.CONSUMER_GROUP_PATTERN}]"
)
if await_duration is None:
raise IllegalArgumentException("awaitDuration should not be null")
super().__init__(
client_configuration,
None if subscription is None else subscription.keys(),
ClientType.SIMPLE_CONSUMER,
)
self.__consumer_group = consumer_group
self.__await_duration = await_duration # long polling timeout, seconds
# <String /* topic */, FilterExpression>
self.__subscriptions = ConcurrentMap()
if subscription is not None:
self.__subscriptions.update(subscription)
# <String /* topic */, Int /* index */>
self.__receive_queue_selectors = ConcurrentMap()
self.__topic_index = AtomicInteger(0)
self.__queue_index_lock = threading.Lock()
def __str__(self):
return f"{ClientType.Name(self.client_type)}:{self.consumer_group}, client_id:{self.client_id}"
def subscribe(self, topic, filter_expression: FilterExpression = None):
if self.is_running is False:
raise IllegalStateException(
"unable to add subscription because simple consumer is not running"
)
try:
if not self.__subscriptions.contains(topic):
self._retrieve_topic_route_data(topic)
self.__subscriptions.put(
topic,
(
filter_expression
if filter_expression is not None
else FilterExpression()
),
)
except Exception as e:
logger.error(f"subscribe raise exception: {e}")
raise e
def unsubscribe(self, topic):
if self.is_running is False:
raise IllegalStateException(
"unable to remove subscription because simple consumer is not running"
)
if topic in self.__subscriptions:
self.__subscriptions.remove(topic)
self._remove_unused_topic_route_data(topic)
def receive(self, max_message_num, invisible_duration):
try:
future, queue = self.__receive(max_message_num, invisible_duration)
read_future = asyncio.run_coroutine_threadsafe(
self.__receive_message_response(future.result()),
self._rpc_channel_io_loop(),
)
return self.__handle_receive_message_response(read_future.result(), queue)
except Exception as e:
raise e
def receive_async(self, max_message_num, invisible_duration):
try:
future, queue = self.__receive(max_message_num, invisible_duration)
read_future = asyncio.run_coroutine_threadsafe(
self.__receive_message_response(future.result()),
self._rpc_channel_io_loop(),
)
ret_future = Future()
handle_send_receipt_callback = functools.partial(
self.__receive_message_callback, ret_future=ret_future, queue=queue
)
read_future.add_done_callback(handle_send_receipt_callback)
return ret_future
except Exception as e:
raise e
def ack(self, message: Message):
try:
future = self.__ack(message)
self.__handle_ack_result(future)
except Exception as e:
raise e
def ack_async(self, message: Message):
try:
future = self.__ack(message)
ret_future = Future()
ack_callback = functools.partial(
self.__handle_ack_result, ret_future=ret_future
)
future.add_done_callback(ack_callback)
return ret_future
except Exception as e:
raise e
def change_invisible_duration(self, message: Message, invisible_duration):
try:
future = self.__change_invisible_duration(message, invisible_duration)
self.__handle_change_invisible_result(future, message)
except Exception as e:
raise e
def change_invisible_duration_async(self, message: Message, invisible_duration):
try:
future = self.__change_invisible_duration(message, invisible_duration)
ret_future = Future()
change_invisible_callback = functools.partial(
self.__handle_change_invisible_result, message=message, ret_future=ret_future
)
future.add_done_callback(change_invisible_callback)
return ret_future
except Exception as e:
raise e
""" override """
def _start_success(self):
logger.info(f"{self.__str__()} start success.")
def _start_failure(self):
logger.info(f"{self.__str__()} start failed.")
def _sync_setting_req(self, endpoints):
subscription = Subscription()
subscription.group.name = self.__consumer_group
subscription.group.resource_namespace = self.client_configuration.namespace
subscription.fifo = False
subscription.long_polling_timeout.seconds = self.__await_duration
items = self.__subscriptions.items()
for topic, expression in items:
sub_entry = subscription.subscriptions.add()
sub_entry.topic.name = topic
sub_entry.topic.resource_namespace = self.client_configuration.namespace
sub_entry.expression.type = expression.filter_type
sub_entry.expression.expression = expression.expression
settings = Settings()
settings.client_type = self.client_type
settings.access_point.CopyFrom(endpoints.endpoints)
settings.request_timeout.seconds = self.client_configuration.request_timeout
settings.subscription.CopyFrom(subscription)
settings.user_agent.language = 6
settings.user_agent.version = Misc.sdk_version()
settings.user_agent.platform = Misc.get_os_description()
settings.user_agent.hostname = Misc.get_local_ip()
settings.metric.on = False
cmd = TelemetryCommand()
cmd.settings.CopyFrom(settings)
return cmd
def _heartbeat_req(self):
req = HeartbeatRequest()
req.client_type = self.client_type
req.group.name = self.__consumer_group
req.group.resource_namespace = self.client_configuration.namespace
return req
def _notify_client_termination_req(self):
req = NotifyClientTerminationRequest()
req.group.resource_namespace = self.client_configuration.namespace
req.group.name = self.__consumer_group
return req
def _update_queue_selector(self, topic, topic_route):
queue_selector = self.__receive_queue_selectors.get(topic)
if queue_selector is None:
return
queue_selector.update(topic_route)
def shutdown(self):
logger.info(f"begin to to shutdown {self.__str__()}.")
super().shutdown()
logger.info(f"shutdown {self.__str__()} success.")
""" private """
# receive message
def __select_topic_for_receive(self):
try:
# select the next topic for receive
mod_index = self.__topic_index.get_and_increment() % len(
self.__subscriptions.keys()
)
return list(self.__subscriptions.keys())[mod_index]
except Exception as e:
logger.error(
f"simple consumer select topic for receive message exception: {e}"
)
raise e
def __select_topic_queue(self, topic):
try:
route = self._retrieve_topic_route_data(topic)
queue_selector = self.__receive_queue_selectors.put_if_absent(
topic, QueueSelector.simple_consumer_queue_selector(route)
)
return queue_selector.select_next_queue()
except Exception as e:
logger.error(f"simple consumer select topic queue raise exception: {e}")
raise e
def __receive_pre_check(self, max_message_num):
if self.is_running is False:
raise IllegalStateException("consumer is not running now.")
if len(self.__subscriptions.keys()) == 0:
raise IllegalArgumentException("There is no topic to receive message")
if max_message_num <= 0:
raise IllegalArgumentException("max_message_num must be greater than 0")
def __receive_req(self, topic, queue, max_message_num, invisible_duration):
filter_expression = self.__subscriptions.get(topic)
req = ReceiveMessageRequest()
req.group.name = self.__consumer_group
req.group.resource_namespace = self.client_configuration.namespace
req.message_queue.CopyFrom(queue.message_queue0())
req.filter_expression.type = filter_expression.filter_type
req.filter_expression.expression = filter_expression.expression
req.batch_size = max_message_num
req.invisible_duration.seconds = invisible_duration
req.long_polling_timeout.seconds = self.__await_duration
req.auto_renew = False
return req
def __receive(self, max_message_num, invisible_duration):
if self.is_running is False:
raise IllegalStateException(
"unable to receive messages because simple consumer is not running"
)
try:
self.__receive_pre_check(max_message_num)
topic = self.__select_topic_for_receive()
queue = self.__select_topic_queue(topic)
req = self.__receive_req(topic, queue, max_message_num, invisible_duration)
timeout = self.client_configuration.request_timeout + self.__await_duration
return self.rpc_client.receive_message_async(
queue.endpoints, req, metadata=self._sign(), timeout=timeout
), queue
except Exception as e:
raise e
def __receive_message_callback(self, future, ret_future, queue):
try:
responses = future.result()
messages = self.__handle_receive_message_response(responses, queue)
self._submit_callback(
CallbackResult.async_receive_callback_result(ret_future, messages)
)
except Exception as e:
self._submit_callback(
CallbackResult.async_receive_callback_result(ret_future, e, False)
)
async def __receive_message_response(self, unary_stream_call):
try:
responses = list()
async for res in unary_stream_call:
if res.HasField("message") or res.HasField("status"):
logger.debug(
f"consumer:{self.__consumer_group} receive response: {res}"
)
responses.append(res)
return responses
except Exception as e:
logger.error(
f"consumer:{self.__consumer_group} receive message exception: {e}"
)
raise e
def __handle_receive_message_response(self, responses, queue):
messages = list()
status = None
for res in responses:
if res.HasField("status"):
logger.debug(
f"simple_consumer[{self.__consumer_group}] receive_message, code:{res.status.code}, message:{res.status.message}."
)
status = res.status
elif res.HasField("message"):
msg = Message().fromProtobuf(res.message)
msg.endpoints = queue.endpoints
messages.append(msg)
MessagingResultChecker.check(status)
return messages
# ack message
def __ack_req(self, message: Message):
req = AckMessageRequest()
req.group.name = self.__consumer_group
req.group.resource_namespace = self.client_configuration.namespace
req.topic.name = message.topic
req.topic.resource_namespace = self.client_configuration.namespace
msg_entry = AckMessageEntry()
msg_entry.message_id = message.message_id
msg_entry.receipt_handle = message.receipt_handle
req.entries.append(msg_entry)
return req
def __ack(self, message: Message):
if self.is_running is False:
raise IllegalStateException(
"unable to ack message because simple consumer is not running"
)
try:
return self.rpc_client.ack_message_async(
message.endpoints,
self.__ack_req(message),
metadata=self._sign(),
timeout=self.client_configuration.request_timeout,
)
except Exception as e:
raise e
def __handle_ack_result(self, future, ret_future=None):
try:
res = future.result()
logger.debug(
f"consumer[{self.__consumer_group}] ack response, {res.status}"
)
MessagingResultChecker.check(res.status)
if ret_future is not None:
self._submit_callback(
CallbackResult.async_ack_callback_result(ret_future, None)
)
except Exception as e:
if ret_future is None:
raise e
else:
self._submit_callback(
CallbackResult.async_ack_callback_result(ret_future, e, False)
)
# change_invisible
def __change_invisible_req(self, message: Message, invisible_duration):
req = ChangeInvisibleDurationRequest()
req.topic.name = message.topic
req.topic.resource_namespace = self.client_configuration.namespace
req.group.name = self.consumer_group
req.group.resource_namespace = self.client_configuration.namespace
req.receipt_handle = message.receipt_handle
req.invisible_duration.seconds = invisible_duration
req.message_id = message.message_id
return req
def __change_invisible_duration(self, message: Message, invisible_duration):
if self.is_running is False:
raise IllegalStateException(
"unable to change invisible duration because simple consumer is not running"
)
try:
return self.rpc_client.change_invisible_duration_async(
message.endpoints,
self.__change_invisible_req(message, invisible_duration),
metadata=self._sign(),
timeout=self.client_configuration.request_timeout,
)
except Exception as e:
raise e
def __handle_change_invisible_result(self, future, message, ret_future=None):
try:
res = future.result()
logger.debug(
f"consumer[{self.__consumer_group}] change invisible response, {res.status}"
)
message.receipt_handle = res.receipt_handle
MessagingResultChecker.check(res.status)
if ret_future is not None:
self._submit_callback(
CallbackResult.async_change_invisible_duration_callback_result(
ret_future, None
)
)
except Exception as e:
if ret_future is None:
raise e
else:
self._submit_callback(
CallbackResult.async_change_invisible_duration_callback_result(
ret_future, e, False
)
)
""" property """
@property
def consumer_group(self):
return self.__consumer_group
@property
def await_duration(self):
return self.__await_duration
@await_duration.setter
def await_duration(self, await_duration):
self.__await_duration = await_duration