python/rocketmq/v5/client/connection/rpc_client.py (259 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio import threading import time from concurrent.futures import Future from grpc import ChannelConnectivity from rocketmq.grpc_protocol import (AckMessageRequest, ChangeInvisibleDurationRequest, EndTransactionRequest, HeartbeatRequest, NotifyClientTerminationRequest, QueryRouteRequest, ReceiveMessageRequest, SendMessageRequest, TelemetryCommand) from rocketmq.v5.client.connection import RpcChannel, RpcEndpoints from rocketmq.v5.log import logger from rocketmq.v5.util import ConcurrentMap class RpcClient: _instance_lock = threading.Lock() _channel_lock = threading.Lock() _io_loop = None # event loop for all async io _io_loop_thread = None # thread for io loop RPC_CLIENT_MAX_IDLE_SECONDS = 60 * 30 def __init__(self, tls_enable=False): with RpcClient._instance_lock: # start an event loop for async io if RpcClient._io_loop is None: initialized_event = threading.Event() RpcClient._io_loop_thread = threading.Thread( target=RpcClient.__init_io_loop, args=(initialized_event,), name="channel_io_loop_thread", ) RpcClient._io_loop_thread.daemon = True RpcClient._io_loop_thread.start() # waiting for thread start success initialized_event.wait() self.channels = ConcurrentMap() self.__clean_idle_channel_scheduler = None self.__clean_idle_channel_scheduler_threading_event = None self.__enable_retrieve_channel = True self.__tls_enable = tls_enable def retrieve_or_create_channel(self, endpoints: RpcEndpoints): if self.__enable_retrieve_channel is False: raise Exception("RpcClient is not running.") try: # get or create a new grpc channel channel = self.__get_channel(endpoints) if channel is not None: channel.update_time = int(time.time()) else: with RpcClient._channel_lock: channel = RpcChannel(endpoints, self.__tls_enable) channel.create_channel(RpcClient.get_channel_io_loop()) self.__put_channel(endpoints, channel) return channel except Exception as e: logger.error(f"retrieve or create channel exception: {e}") raise e def clear_idle_rpc_channels(self): items = self.channels.items() now = int(time.time()) idle_endpoints = list() for endpoints, channel in items: if now - channel.update_time > RpcClient.RPC_CLIENT_MAX_IDLE_SECONDS: idle_endpoints.append(endpoints) if len(idle_endpoints) > 0: with RpcClient._channel_lock: for endpoints in idle_endpoints: logger.info(f"remove idle channel {endpoints.__str__()}") self.__close_rpc_channel(endpoints) self.channels.remove(endpoints) def stop(self): with RpcClient._channel_lock: self.__enable_retrieve_channel = False all_endpoints = self.channels.keys() for endpoints in all_endpoints: self.__close_rpc_channel(endpoints) @staticmethod def get_channel_io_loop(): return RpcClient._io_loop """ grpc MessageService """ def query_topic_route_async( self, endpoints: RpcEndpoints, req: QueryRouteRequest, metadata, timeout=3 ): return RpcClient.__run_message_service_async( self.__query_route_async_0( endpoints, req, metadata=metadata, timeout=timeout ) ) def send_message_async( self, endpoints: RpcEndpoints, req: SendMessageRequest, metadata, timeout=3 ): return RpcClient.__run_message_service_async( self.__send_message_0(endpoints, req, metadata=metadata, timeout=timeout) ) def receive_message_async( self, endpoints: RpcEndpoints, req: ReceiveMessageRequest, metadata, timeout=3 ): return RpcClient.__run_message_service_async( self.__receive_message_0(endpoints, req, metadata=metadata, timeout=timeout) ) def ack_message_async( self, endpoints: RpcEndpoints, req: AckMessageRequest, metadata, timeout=3 ): return RpcClient.__run_message_service_async( self.__ack_message_0(endpoints, req, metadata=metadata, timeout=timeout) ) def change_invisible_duration_async( self, endpoints: RpcEndpoints, req: ChangeInvisibleDurationRequest, metadata, timeout=3, ): return RpcClient.__run_message_service_async( self.__change_invisible_duration_0( endpoints, req, metadata=metadata, timeout=timeout ) ) def heartbeat_async( self, endpoints: RpcEndpoints, req: HeartbeatRequest, metadata, timeout=3 ): return RpcClient.__run_message_service_async( self.__heartbeat_async_0(endpoints, req, metadata=metadata, timeout=timeout) ) def telemetry_write_async(self, endpoints: RpcEndpoints, req: TelemetryCommand): return RpcClient.__run_message_service_async( self.retrieve_or_create_channel( endpoints ).telemetry_stream_stream_call.stream_write(req) ) def end_transaction_async( self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, timeout=3 ): return RpcClient.__run_message_service_async( self.__end_transaction_0(endpoints, req, metadata=metadata, timeout=timeout) ) def notify_client_termination( self, endpoints: RpcEndpoints, req: NotifyClientTerminationRequest, metadata, timeout=3, ): return RpcClient.__run_message_service_async( self.__notify_client_termination_0( endpoints, req, metadata=metadata, timeout=timeout ) ) """ build stream_stream_call """ def telemetry_stream( self, endpoints: RpcEndpoints, client, metadata, rebuild, timeout=3000 ): # assert asyncio.get_running_loop() == RpcClient._io_loop try: channel = self.retrieve_or_create_channel(endpoints) stream = channel.async_stub.Telemetry( metadata=metadata, timeout=timeout, wait_for_ready=True ) channel.register_telemetry_stream_stream_call(stream, client) asyncio.run_coroutine_threadsafe( channel.telemetry_stream_stream_call.start_stream_read(), RpcClient.get_channel_io_loop(), ) logger.info( f"{client.__str__()} rebuild stream_steam_call to {endpoints.__str__()}." if rebuild else f"{client.__str__()} create stream_steam_call to {endpoints.__str__()}." ) return channel except Exception as e: raise e """ MessageService.stub impl """ async def __query_route_async_0( self, endpoints: RpcEndpoints, req: QueryRouteRequest, metadata, timeout=3 ): return await self.retrieve_or_create_channel(endpoints).async_stub.QueryRoute( req, metadata=metadata, timeout=timeout ) async def __send_message_0( self, endpoints: RpcEndpoints, req: SendMessageRequest, metadata, timeout=3 ): return await self.retrieve_or_create_channel(endpoints).async_stub.SendMessage( req, metadata=metadata, timeout=timeout ) async def __receive_message_0( self, endpoints: RpcEndpoints, req: ReceiveMessageRequest, metadata, timeout=3 ): return self.retrieve_or_create_channel(endpoints).async_stub.ReceiveMessage( req, metadata=metadata, timeout=timeout ) async def __ack_message_0( self, endpoints: RpcEndpoints, req: AckMessageRequest, metadata, timeout=3 ): return await self.retrieve_or_create_channel(endpoints).async_stub.AckMessage( req, metadata=metadata, timeout=timeout ) async def __heartbeat_async_0( self, endpoints: RpcEndpoints, req: HeartbeatRequest, metadata, timeout=3 ): return await self.retrieve_or_create_channel(endpoints).async_stub.Heartbeat( req, metadata=metadata, timeout=timeout ) async def __change_invisible_duration_0( self, endpoints: RpcEndpoints, req: ChangeInvisibleDurationRequest, metadata, timeout=3, ): return await self.retrieve_or_create_channel( endpoints ).async_stub.ChangeInvisibleDuration(req, metadata=metadata, timeout=timeout) async def __end_transaction_0( self, endpoints: RpcEndpoints, req: EndTransactionRequest, metadata, timeout=3 ): return await self.retrieve_or_create_channel( endpoints ).async_stub.EndTransaction(req, metadata=metadata, timeout=timeout) async def __notify_client_termination_0( self, endpoints: RpcEndpoints, req: NotifyClientTerminationRequest, metadata, timeout=3, ): return await self.retrieve_or_create_channel( endpoints ).async_stub.NotifyClientTermination(req, metadata=metadata, timeout=timeout) async def __create_channel_async(self, endpoints: RpcEndpoints): return self.retrieve_or_create_channel(endpoints) """ private """ def __get_channel(self, endpoints: RpcEndpoints) -> RpcChannel: return self.channels.get(endpoints) def __put_channel(self, endpoints: RpcEndpoints, channel): self.channels.put(endpoints, channel) def __close_rpc_channel(self, endpoints: RpcEndpoints): channel = self.__get_channel(endpoints) if ( channel is not None and channel.channel_state() is not ChannelConnectivity.SHUTDOWN ): try: channel.close_channel(RpcClient.get_channel_io_loop()) self.channels.remove(endpoints) except Exception as e: logger.error(f"close channel {endpoints.__str__()} error: {e}") raise e @staticmethod def __init_io_loop(initialized_event): # start a thread, set an event loop to the thread. all clients use the same event loop for io operation # loop only init once, running forever until the process ends # RpcClient use RpcClient._io_loop to execute grpc call try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) RpcClient._io_loop = loop initialized_event.set() logger.info("start io loop thread success.") loop.run_forever() except Exception as e: logger.error(f"start io loop thread exception: {e}") @staticmethod def __run_message_service_async(func): try: # execute grpc call in RpcClient._io_loop return asyncio.run_coroutine_threadsafe( func, RpcClient.get_channel_io_loop() ) except Exception as e: future = Future() future.set_exception(e) return future