python/rocketmq/v5/client/client.py (456 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio import functools import os import threading from asyncio import InvalidStateError from concurrent.futures import ThreadPoolExecutor from grpc.aio import AioRpcError from rocketmq.grpc_protocol import ClientType, Code, QueryRouteRequest from rocketmq.v5.client.connection import RpcClient from rocketmq.v5.client.metrics import ClientMetrics from rocketmq.v5.exception import (IllegalArgumentException, IllegalStateException) from rocketmq.v5.log import logger from rocketmq.v5.model import TopicRouteData from rocketmq.v5.util import (ClientId, ConcurrentMap, MessagingResultChecker, Misc, Signature) class Client: def __init__( self, client_configuration, topics, client_type: ClientType, tls_enable=False ): if client_configuration is None: raise IllegalArgumentException("clientConfiguration should not be null.") self.__client_configuration = client_configuration self.__client_type = client_type self.__client_id = ClientId().client_id # {topic, topicRouteData} self.__topic_route_cache = ConcurrentMap() self.__rpc_client = RpcClient(tls_enable) self.__client_metrics = ClientMetrics(self.__client_id, client_configuration) self.__topic_route_scheduler = None self.__heartbeat_scheduler = None self.__sync_setting_scheduler = None self.__clear_idle_rpc_channels_scheduler = None self.__topic_route_scheduler_threading_event = None self.__heartbeat_scheduler_threading_event = None self.__sync_setting_scheduler_threading_event = None self.__clear_idle_rpc_channels_threading_event = None if topics is not None: self.__topics = set( filter(lambda topic: Misc.is_valid_topic(topic), topics) ) else: self.__topics = set() self.__client_callback_executor = None self.__is_running = False self.__client_thread_task_enabled = False self.__had_shutdown = False def startup(self): try: if self.__had_shutdown is True: raise Exception( f"client:{self.__client_id} had shutdown, can't startup again." ) try: # pre update topic route for producer or consumer for topic in self.__topics: self.__update_topic_route(topic) except Exception as e: # ignore this exception and retrieve again when calling send or receive logger.warn( f"update topic exception when client startup, ignore it, try it again in scheduler. exception: {e}" ) self.__start_scheduler() self.__start_async_rpc_callback_handler() self.__is_running = True self._start_success() except Exception as e: self.__is_running = False self.__stop_client_threads() self._start_failure() logger.error(f"client:{self.__client_id} startup exception: {e}") raise e def shutdown(self): if self.is_running is False: raise IllegalStateException(f"client:{self.__client_id} is not running.") if self.__had_shutdown is True: raise IllegalStateException(f"client:{self.__client_id} had shutdown.") try: self.__stop_client_threads() self.__notify_client_termination() self.__rpc_client.stop() self.__topic_route_cache.clear() self.__topics.clear() self.__had_shutdown = True self.__is_running = False except Exception as e: logger.error(f"{self.__str__()} shutdown exception: {e}") raise e """ abstract """ def _start_success(self): """each subclass implements its own actions after a successful startup""" pass def _start_failure(self): """each subclass implements its own actions after a startup failure""" pass def _sync_setting_req(self, endpoints): """each subclass implements its own telemetry settings scheme""" pass def _heartbeat_req(self): """each subclass implements its own heartbeat request""" pass def _notify_client_termination_req(self): """each subclass implements its own client termination request""" pass def _update_queue_selector(self, topic, topic_route): """each subclass implements its own queue selector""" pass """ scheduler """ def __start_scheduler(self): # start 4 schedulers in different threads, each thread use the same asyncio event loop. try: # update topic route every 30 seconds self.__client_thread_task_enabled = True self.__topic_route_scheduler = threading.Thread( target=self.__schedule_update_topic_route_cache, name="update_topic_route_schedule_thread", ) self.__topic_route_scheduler_threading_event = threading.Event() self.__topic_route_scheduler.start() logger.info("start topic route scheduler success.") # send heartbeat to all endpoints every 10 seconds self.__heartbeat_scheduler = threading.Thread( target=self.__schedule_heartbeat, name="heartbeat_schedule_thread" ) self.__heartbeat_scheduler_threading_event = threading.Event() self.__heartbeat_scheduler.start() logger.info("start heartbeat scheduler success.") # send client setting to all endpoints every 5 seconds self.__sync_setting_scheduler = threading.Thread( target=self.__schedule_update_setting, name="sync_setting_schedule_thread", ) self.__sync_setting_scheduler_threading_event = threading.Event() self.__sync_setting_scheduler.start() logger.info("start sync setting scheduler success.") # clear unused grpc channel(>30 minutes) every 60 seconds self.__clear_idle_rpc_channels_scheduler = threading.Thread( target=self.__schedule_clear_idle_rpc_channels, name="clear_idle_rpc_channel_schedule_thread", ) self.__clear_idle_rpc_channels_threading_event = threading.Event() self.__clear_idle_rpc_channels_scheduler.start() logger.info("start clear idle rpc channels scheduler success.") except Exception as e: logger.info(f"start scheduler exception: {e}") self.__stop_client_threads() raise e def __schedule_update_topic_route_cache(self): asyncio.set_event_loop(self._rpc_channel_io_loop()) while True: if self.__client_thread_task_enabled is True: self.__topic_route_scheduler_threading_event.wait(30) logger.debug(f"{self.__str__()} run update topic route in scheduler.") # update topic route for each topic in cache topics = self.__topic_route_cache.keys() for topic in topics: try: if self.__client_thread_task_enabled is True: self.__update_topic_route_async(topic) except Exception as e: logger.error( f"{self.__str__()} scheduler update topic:{topic} route raise exception: {e}" ) else: break logger.info( f"{self.__str__()} stop scheduler for update topic route cache success." ) def __schedule_heartbeat(self): asyncio.set_event_loop(self._rpc_channel_io_loop()) while True: if self.__client_thread_task_enabled is True: self.__heartbeat_scheduler_threading_event.wait(10) logger.debug(f"{self.__str__()} run send heartbeat in scheduler.") all_endpoints = self.__get_all_endpoints().values() try: for endpoints in all_endpoints: if self.__client_thread_task_enabled is True: self.__heartbeat_async(endpoints) except Exception as e: logger.error( f"{self.__str__()} scheduler send heartbeat raise exception: {e}" ) else: break logger.info(f"{self.__str__()} stop scheduler for heartbeat success.") def __schedule_update_setting(self): asyncio.set_event_loop(self._rpc_channel_io_loop()) while True: if self.__client_thread_task_enabled is True: self.__sync_setting_scheduler_threading_event.wait(5) logger.debug(f"{self.__str__()} run update setting in scheduler.") try: all_endpoints = self.__get_all_endpoints().values() for endpoints in all_endpoints: if self.__client_thread_task_enabled is True: self.__setting_write(endpoints) except Exception as e: logger.error( f"{self.__str__()} scheduler set setting raise exception: {e}" ) else: break logger.info(f"{self.__str__()} stop scheduler for update setting success.") def __schedule_clear_idle_rpc_channels(self): while True: if self.__client_thread_task_enabled is True: self.__clear_idle_rpc_channels_threading_event.wait(60) logger.debug( f"{self.__str__()} run scheduler for clear idle rpc channels." ) try: if self.__client_thread_task_enabled is True: self.__rpc_client.clear_idle_rpc_channels() except Exception as e: logger.error( f"{self.__str__()} run scheduler for clear idle rpc channels: {e}" ) else: break logger.info( f"{self.__str__()} stop scheduler for clear idle rpc channels success." ) """ callback handler for async method """ def __start_async_rpc_callback_handler(self): # to handle callback when using async method such as send_async(), receive_async(). # switches user's callback thread from RpcClient's _io_loop_thread to client's client_callback_worker_thread try: workers = os.cpu_count() self.__client_callback_executor = ThreadPoolExecutor(max_workers=workers, thread_name_prefix=f"client_callback_worker-{self.__client_id}") logger.info(f"{self.__str__()} start callback executor success. max_workers:{workers}") except Exception as e: print(f"{self.__str__()} start async rpc callback raise exception: {e}") raise e @staticmethod def __handle_callback(callback_result): if callback_result.is_success: callback_result.future.set_result(callback_result.result) else: callback_result.future.set_exception(callback_result.result) """ protect """ def _retrieve_topic_route_data(self, topic): route = self.__topic_route_cache.get(topic) if route is not None: return route else: route = self.__update_topic_route(topic) if route is not None: logger.info(f"{self.__str__()} update topic:{topic} route success.") self.__topics.add(topic) return route else: raise Exception(f"failed to fetch topic:{topic} route.") def _remove_unused_topic_route_data(self, topic): self.__topic_route_cache.remove(topic) self.__topics.remove(topic) def _sign(self): return Signature.metadata(self.__client_configuration, self.__client_id) def _rpc_channel_io_loop(self): return self.__rpc_client.get_channel_io_loop() def _submit_callback(self, callback_result): self.__client_callback_executor.submit(Client.__handle_callback, callback_result) """ private """ # topic route # def __update_topic_route(self, topic): event = threading.Event() callback = functools.partial( self.__query_topic_route_async_callback, topic=topic, event=event ) future = self.__rpc_client.query_topic_route_async( self.__client_configuration.rpc_endpoints, self.__topic_route_req(topic), metadata=self._sign(), timeout=self.__client_configuration.request_timeout, ) future.add_done_callback(callback) event.wait() return self.__topic_route_cache.get(topic) def __update_topic_route_async(self, topic): callback = functools.partial( self.__query_topic_route_async_callback, topic=topic ) future = self.__rpc_client.query_topic_route_async( self.__client_configuration.rpc_endpoints, self.__topic_route_req(topic), metadata=self._sign(), timeout=self.__client_configuration.request_timeout, ) future.add_done_callback(callback) def __query_topic_route_async_callback(self, future, topic, event=None): try: res = future.result() self.__handle_topic_route_res(res, topic) except Exception as e: raise e finally: if event is not None: event.set() def __topic_route_req(self, topic): req = QueryRouteRequest() req.topic.name = topic req.topic.resource_namespace = self.__client_configuration.namespace req.endpoints.CopyFrom(self.__client_configuration.rpc_endpoints.endpoints) return req def __handle_topic_route_res(self, res, topic): if res is not None: MessagingResultChecker.check(res.status) if res.status.code == Code.OK: topic_route = TopicRouteData(res.message_queues) logger.info( f"{self.__str__()} update topic:{topic} route, route info: {topic_route.__str__()}" ) # if topic route has new endpoint, connect self.__check_topic_route_endpoints_changed(topic, topic_route) self.__topic_route_cache.put(topic, topic_route) # producer or consumer update its queue selector self._update_queue_selector(topic, topic_route) return topic_route else: raise Exception(f"query topic route exception, topic:{topic}") # heartbeat # def __heartbeat_async(self, endpoints): req = self._heartbeat_req() callback = functools.partial(self.__heartbeat_callback, endpoints=endpoints) future = self.__rpc_client.heartbeat_async( endpoints, req, metadata=self._sign(), timeout=self.__client_configuration.request_timeout, ) future.add_done_callback(callback) def __heartbeat_callback(self, future, endpoints): try: res = future.result() if res is not None and res.status.code == Code.OK: logger.info( f"{self.__str__()} send heartbeat to {endpoints.__str__()} success." ) else: if res is not None: logger.error( f"{self.__str__()} send heartbeat to {endpoints.__str__()} error, code:{res.status.code}, message:{res.status.message}." ) else: logger.error( f"{self.__str__()} send heartbeat to {endpoints.__str__()} error, response is none." ) except Exception as e: logger.error( f"{self.__str__()} send heartbeat to {endpoints.__str__()} exception, e: {e}" ) raise e # sync settings # def __retrieve_telemetry_stream_stream_call(self, endpoints, rebuild=False): try: self.__rpc_client.telemetry_stream( endpoints, self, self._sign(), rebuild, timeout=60 * 60 * 24 * 365 ) except Exception as e: logger.error( f"{self.__str__()} rebuild stream_steam_call to {endpoints.__str__()} exception: {e}" if rebuild else f"{self.__str__()} create stream_steam_call to {endpoints.__str__()} exception: {e}" ) def __setting_write(self, endpoints): req = self._sync_setting_req(endpoints) callback = functools.partial(self.__setting_write_callback, endpoints=endpoints) future = self.__rpc_client.telemetry_write_async(endpoints, req) logger.debug(f"{self.__str__()} send setting to {endpoints.__str__()}, {req}") future.add_done_callback(callback) def __setting_write_callback(self, future, endpoints): try: future.result() logger.info( f"{self.__str__()} send setting to {endpoints.__str__()} success." ) except InvalidStateError as e: logger.warn( f"{self.__str__()} send setting to {endpoints.__str__()} occurred InvalidStateError: {e}" ) self.__retrieve_telemetry_stream_stream_call(endpoints, rebuild=True) except AioRpcError as e: logger.warn( f"{self.__str__()} send setting to {endpoints.__str__()} occurred AioRpcError: {e}" ) self.__retrieve_telemetry_stream_stream_call(endpoints, rebuild=True) except Exception as e: logger.error( f"{self.__str__()} send setting to {endpoints.__str__()} exception: {e}" ) self.__retrieve_telemetry_stream_stream_call(endpoints, rebuild=True) # metrics # def reset_metric(self, metric): self.__client_metrics.reset_metrics(metric) # client termination # def __client_termination(self, endpoints): req = self._notify_client_termination_req() future = self.__rpc_client.notify_client_termination( endpoints, req, metadata=self._sign(), timeout=self.__client_configuration.request_timeout, ) future.result() # others ## def __get_all_endpoints(self): endpoints_map = {} all_route = self.__topic_route_cache.values() for topic_route in all_route: endpoints_map.update(topic_route.all_endpoints()) return endpoints_map def __check_topic_route_endpoints_changed(self, topic, route): old_route = self.__topic_route_cache.get(topic) if old_route is None or old_route != route: logger.info( f"topic:{topic} route changed for {self.__str__()}. old route is {old_route}, new route is {route}" ) all_endpoints = self.__get_all_endpoints() # the existing endpoints topic_route_endpoints = ( route.all_endpoints() ) # the latest endpoints for topic route diff = set(topic_route_endpoints.keys()).difference( set(all_endpoints.keys()) ) # the diff between existing and latest # create grpc channel, stream_stream_call for new endpoints, send setting to new endpoints for address in diff: endpoints = topic_route_endpoints[address] self.__retrieve_telemetry_stream_stream_call(endpoints) self.__setting_write(endpoints) def __notify_client_termination(self): all_endpoints = self.__get_all_endpoints() for endpoints in all_endpoints.values(): try: self.__client_termination(endpoints) except Exception as e: logger.error(f"notify client termination to {endpoints} exception: {e}") def __stop_client_threads(self): self.__client_thread_task_enabled = False if self.__topic_route_scheduler is not None: if self.__topic_route_scheduler_threading_event is not None: self.__topic_route_scheduler_threading_event.set() self.__topic_route_scheduler.join() if self.__heartbeat_scheduler is not None: if self.__heartbeat_scheduler_threading_event is not None: self.__heartbeat_scheduler_threading_event.set() self.__heartbeat_scheduler.join() if self.__sync_setting_scheduler is not None: if self.__sync_setting_scheduler_threading_event is not None: self.__sync_setting_scheduler_threading_event.set() self.__sync_setting_scheduler.join() if self.__clear_idle_rpc_channels_scheduler is not None: if self.__clear_idle_rpc_channels_threading_event is not None: self.__clear_idle_rpc_channels_threading_event.set() self.__clear_idle_rpc_channels_scheduler.join() if self.__client_callback_executor is not None: self.__client_callback_executor.shutdown() self.__topic_route_scheduler = None self.__topic_route_scheduler_threading_event = None self.__heartbeat_scheduler = None self.__heartbeat_scheduler_threading_event = None self.__sync_setting_scheduler = None self.__sync_setting_scheduler_threading_event = None self.__clear_idle_rpc_channels_scheduler = None self.__clear_idle_rpc_channels_threading_event = None self.__client_callback_executor = None """ property """ @property def is_running(self): return self.__is_running @property def client_id(self): return self.__client_id @property def topics(self): return self.__topics @property def client_configuration(self): return self.__client_configuration @property def client_type(self): return self.__client_type @property def rpc_client(self): return self.__rpc_client @property def client_metrics(self): return self.__client_metrics