python/rocketmq/v5/producer/producer.py (457 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import abc import functools import threading import time from concurrent.futures import Future, ThreadPoolExecutor from rocketmq.grpc_protocol import (ClientType, Code, Encoding, EndTransactionRequest, HeartbeatRequest, MessageType, NotifyClientTerminationRequest, Publishing, SendMessageRequest, Settings, TelemetryCommand, TransactionResolution, TransactionSource) from rocketmq.v5.client import Client from rocketmq.v5.client.balancer import QueueSelector from rocketmq.v5.exception import (ClientException, IllegalArgumentException, IllegalStateException, TooManyRequestsException) from rocketmq.v5.log import logger from rocketmq.v5.model import CallbackResult, Message, SendReceipt from rocketmq.v5.util import (ConcurrentMap, MessageIdCodec, MessagingResultChecker, Misc) class Transaction: __transaction_lock = threading.Lock() def __init__(self, producer): self.__message = None self.__send_receipt = None self.__producer = producer def add_half_message(self, message: Message): with Transaction.__transaction_lock: if message is None: raise IllegalArgumentException( "add half message error, message is none." ) if self.__message is None: self.__message = message else: raise IllegalArgumentException( f"message already existed in transaction, topic:{message.topic}" ) def add_send_receipt(self, send_receipt): with Transaction.__transaction_lock: if self.__message is None: raise IllegalArgumentException( "add send receipt error, no message in transaction." ) if send_receipt is None: raise IllegalArgumentException( "add send receipt error, send receipt in none." ) if self.__message.message_id != send_receipt.message_id: raise IllegalArgumentException( "can't add another send receipt to a half message." ) self.__send_receipt = send_receipt def commit(self): return self.__commit_or_rollback(TransactionResolution.COMMIT) def rollback(self): return self.__commit_or_rollback(TransactionResolution.ROLLBACK) def __commit_or_rollback(self, result): if self.__message is None: raise IllegalArgumentException("no message in transaction.") if self.__send_receipt is None or self.__send_receipt.transaction_id is None: raise IllegalArgumentException( "no transaction_id in transaction, must send half message at first." ) try: res = self.__producer.end_transaction( self.__send_receipt.message_queue.endpoints, self.__message, self.__send_receipt.transaction_id, result, TransactionSource.SOURCE_CLIENT, ) if res.status.code != Code.OK: logger.error( f"transaction commit or rollback error. topic:{self.__message.topic}, message_id:{self.__message.message_id}, transaction_id:{self.__send_receipt.transaction_id}, transactionResolution:{result}" ) raise ClientException(res.status.message, res.status.code) return res except Exception as e: logger.error( f"end transaction error, topic:{self.__message.topic}, message_id:{self.__send_receipt.message_id}, transaction_id:{self.__send_receipt.transaction_id}, transactionResolution:{result}: {e}" ) raise e """ property """ @property def message_id(self): return self.__message.message_id class TransactionChecker(metaclass=abc.ABCMeta): @abc.abstractmethod def check(self, message: Message) -> TransactionResolution: pass class Producer(Client): MAX_SEND_ATTEMPTS = 3 # max retry times when send failed def __init__( self, client_configuration, topics=None, checker=None, tls_enable=False ): super().__init__(client_configuration, topics, ClientType.PRODUCER, tls_enable) # {topic, QueueSelector} self.__send_queue_selectors = ConcurrentMap() self.__checker = ( checker # checker for transaction message, handle checking from server ) self.__transaction_check_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="transaction_check_worker") def __str__(self): return f"{ClientType.Name(self.client_type)} client_id:{self.client_id}" # send message # def send(self, message: Message, transaction=None) -> SendReceipt: if self.is_running is False: raise IllegalStateException("producer is not running now.") self.__wrap_sending_message(message, False if transaction is None else True) topic_queue = self.__select_send_queue(message) if message.message_type not in topic_queue.accept_message_types: raise IllegalArgumentException( f"current message type not match with queue accept message types, topic:{message.topic}, message_type:{Message.message_type_desc(message.message_type)}, queue access type:{topic_queue.accept_message_types_desc()}" ) if transaction is None: try: return self.__send(message, topic_queue) except Exception as e: logger.error(f"send message exception, topic: {message.topic}, e: {e}") raise e else: try: transaction.add_half_message(message) send_receipt = self.__send(message, topic_queue) message.message_id = send_receipt.message_id transaction.add_send_receipt(send_receipt) return send_receipt except IllegalArgumentException as e: raise e except Exception as e: logger.error( f"send transaction message exception, topic: {message.topic}, e: {e}" ) raise e def send_async(self, message: Message): if self.is_running is False: raise IllegalStateException("producer is not running now.") self.__wrap_sending_message(message, False) topic_queue = self.__select_send_queue(message) if message.message_type not in topic_queue.accept_message_types: raise IllegalArgumentException( f"current message type not match with queue accept message types, " f"topic:{message.topic}, message_type:{Message.message_type_desc(message.message_type)}, " f"queue access type:{topic_queue.accept_message_types_desc()}" ) try: return self.__send_async(message, topic_queue) except Exception as e: logger.error(f"send message exception, topic: {message.topic}, {e}") raise e # transaction # def begin_transaction(self): if self.is_running is False: raise IllegalStateException( "unable to begin transaction because producer is not running" ) if self.__checker is None: raise IllegalArgumentException("Transaction checker should not be null.") return Transaction(self) def end_transaction(self, endpoints, message, transaction_id, result, source): if self.is_running is False: raise IllegalStateException( "unable to end transaction because producer is not running" ) if self.__checker is None: raise IllegalArgumentException("Transaction checker should not be null.") req = self.__end_transaction_req(message, transaction_id, result, source) future = self.rpc_client.end_transaction_async( endpoints, req, metadata=self._sign(), timeout=self.client_configuration.request_timeout, ) return future.result() def on_recover_orphaned_transaction_command( self, endpoints, msg, transaction_id ): # call this function from server side stream, in RpcClient._io_loop try: if self.is_running is False: raise IllegalStateException( "unable to recover orphaned transaction command because producer is not running" ) if self.__checker is None: raise IllegalArgumentException("No transaction checker registered.") message = Message().fromProtobuf(msg) self.__transaction_check_executor.submit(self.__server_transaction_check, endpoints, message, transaction_id) except Exception as e: logger.error(f"on_recover_orphaned_transaction_command exception: {e}") """ override """ def _start_success(self): logger.info(f"{self.__str__()} start success.") def _start_failure(self): logger.error(f"{self.__str__()} start failed.") def _sync_setting_req(self, endpoints): # publishing pub = Publishing() topics = self.topics for topic in topics: resource = pub.topics.add() resource.name = topic resource.resource_namespace = self.client_configuration.namespace pub.max_body_size = 1024 * 1024 * 128 pub.validate_message_type = True # setting settings = Settings() settings.client_type = self.client_type settings.access_point.CopyFrom(endpoints.endpoints) settings.request_timeout.seconds = self.client_configuration.request_timeout settings.publishing.CopyFrom(pub) settings.user_agent.language = Misc.sdk_language() settings.user_agent.version = Misc.sdk_version() settings.user_agent.platform = Misc.get_os_description() settings.user_agent.hostname = Misc.get_local_ip() settings.metric.on = False cmd = TelemetryCommand() cmd.settings.CopyFrom(settings) return cmd def _heartbeat_req(self): req = HeartbeatRequest() req.client_type = self.client_type return req def _notify_client_termination_req(self): return NotifyClientTerminationRequest() def _update_queue_selector(self, topic, topic_route): queue_selector = self.__send_queue_selectors.get(topic) if queue_selector is None: return queue_selector.update(topic_route) def shutdown(self): logger.info(f"begin to shutdown {self.__str__()}") self.__transaction_check_executor.shutdown() self.__transaction_check_executor = None super().shutdown() logger.info(f"shutdown {self.__str__()} success.") """ private """ def __send(self, message: Message, topic_queue, attempt=1) -> SendReceipt: req = self.__send_req(message) send_context = self.client_metrics.send_before(message.topic) print(f"{topic_queue}") send_message_future = self.rpc_client.send_message_async( topic_queue.endpoints, req, self._sign(), timeout=self.client_configuration.request_timeout, ) return self.__handle_sync_send_receipt( send_message_future, message, topic_queue, attempt, send_context ) def __handle_sync_send_receipt( self, send_message_future, message, topic_queue, attempt, send_metric_context=None, ): try: send_receipt = self.__process_send_message_response( send_message_future, topic_queue ) self.client_metrics.send_after(send_metric_context, True) return send_receipt except Exception as e: attempt += 1 retry_exception_future = self.__check_send_retry_condition( message, topic_queue, attempt, e ) if retry_exception_future is not None: # end retry with exception self.client_metrics.send_after(send_metric_context, False) raise retry_exception_future.exception() # resend message topic_queue = self.__select_send_queue(message) return self.__send(message, topic_queue, attempt) def __send_async(self, message: Message, topic_queue, attempt=1, ret_future=None): req = self.__send_req(message) send_context = self.client_metrics.send_before(message.topic) send_message_future = self.rpc_client.send_message_async( topic_queue.endpoints, req, self._sign(), timeout=self.client_configuration.request_timeout, ) if ret_future is None: ret_future = Future() handle_send_receipt_callback = functools.partial( self.__handle_async_send_receipt, message=message, topic_queue=topic_queue, attempt=attempt, ret_future=ret_future, send_metric_context=send_context, ) send_message_future.add_done_callback(handle_send_receipt_callback) return ret_future def __handle_async_send_receipt( self, send_message_future, message, topic_queue, attempt, ret_future, send_metric_context=None, ): try: send_receipt = self.__process_send_message_response( send_message_future, topic_queue ) self.client_metrics.send_after(send_metric_context, True) self._submit_callback( CallbackResult.async_send_callback_result(ret_future, send_receipt) ) except Exception as e: attempt += 1 retry_exception_future = self.__check_send_retry_condition( message, topic_queue, attempt, e ) if retry_exception_future is not None: # end retry with exception self.client_metrics.send_after(send_metric_context, False) self._submit_callback( CallbackResult.async_send_callback_result( ret_future, retry_exception_future.exception(), False ) ) return # resend message topic_queue = self.__select_send_queue(message) self.__send_async(message, topic_queue, attempt, ret_future) def __process_send_message_response(self, send_message_future, topic_queue): res = send_message_future.result() MessagingResultChecker.check(res.status) entries = res.entries assert ( len(entries) == 1 ), f"entries size error, the send response entries size is {len(entries)}, {self.__str__()}" entry = entries[0] return SendReceipt( entry.message_id, entry.transaction_id, topic_queue, entry.offset ) def __check_send_retry_condition(self, message, topic_queue, attempt, e): end_retry = False if attempt > Producer.MAX_SEND_ATTEMPTS: logger.error( f"{self.__str__()} failed to send message to {topic_queue.endpoints.__str__()}, because of run out of attempt times, topic:{message.topic}, message_id:{message.message_id}, message_type:{message.message_type}, attempt:{attempt}" ) end_retry = True # end retry if system busy if isinstance(e, TooManyRequestsException): logger.error( f"{self.__str__()} failed to send message to {topic_queue.endpoints.__str__()}, because of to too many requests, topic:{message.topic}, message_type:{message.message_type}, message_id:{message.message_id}, attempt:{attempt}" ) end_retry = True if end_retry: send_exception_future = Future() send_exception_future.set_exception(e) return send_exception_future else: return None def __wrap_sending_message(self, message, is_transaction): message.message_id = MessageIdCodec().next_message_id() message.message_type = self.__send_message_type(message, is_transaction) def __send_req(self, message: Message): try: req = SendMessageRequest() msg = req.messages.add() msg.topic.name = message.topic msg.topic.resource_namespace = self.client_configuration.namespace if message.body is None or len(message.body) == 0: raise IllegalArgumentException("message body is none.") max_body_size = 4 * 1024 * 1024 # max body size is 4m if len(message.body) > max_body_size: raise IllegalArgumentException( f"Message body size exceeds the threshold, max size={max_body_size} bytes" ) msg.body = message.body if message.tag is not None: msg.system_properties.tag = message.tag if message.keys is not None: msg.system_properties.keys.extend(message.keys) if message.properties is not None: msg.user_properties.update(message.properties) msg.system_properties.message_id = message.message_id msg.system_properties.message_type = message.message_type msg.system_properties.born_timestamp.seconds = int(time.time()) msg.system_properties.born_host = Misc.get_local_ip() msg.system_properties.body_encoding = Encoding.IDENTITY if message.message_group is not None: msg.system_properties.message_group = message.message_group if message.delivery_timestamp is not None: msg.system_properties.delivery_timestamp.seconds = ( message.delivery_timestamp ) return req except Exception as e: raise e def __send_message_type(self, message: Message, is_transaction=False): if ( message.message_group is None and message.delivery_timestamp is None and is_transaction is False ): return MessageType.NORMAL if message.message_group is not None and is_transaction is False: return MessageType.FIFO if message.delivery_timestamp is not None and is_transaction is False: return MessageType.DELAY if ( message.message_group is None and message.delivery_timestamp is None and is_transaction is True ): return MessageType.TRANSACTION # transaction semantics is conflicted with fifo/delay. logger.error( f"{self.__str__()} set send message type exception, message: {str(message)}" ) raise IllegalArgumentException( "transactional message should not set messageGroup or deliveryTimestamp" ) def __select_send_queue(self, message): try: route = self._retrieve_topic_route_data(message.topic) queue_selector = self.__send_queue_selectors.put_if_absent( message.topic, QueueSelector.producer_queue_selector(route) ) if message.message_group is None: return queue_selector.select_next_queue() else: return queue_selector.select_queue_by_hash_key(message.message_group) except Exception as e: logger.error(f"producer select topic:{message.topic} queue raise exception, {e}") raise e def __end_transaction_req(self, message: Message, transaction_id, result, source): req = EndTransactionRequest() req.topic.name = message.topic req.topic.resource_namespace = self.client_configuration.namespace req.message_id = message.message_id req.transaction_id = transaction_id req.resolution = result req.source = source return req def __server_transaction_check_callback(self, future, message, transaction_id, result): try: res = future.result() if res is not None and res.status.code == Code.OK: if result == TransactionResolution.COMMIT: logger.debug( f"{self.__str__()} commit message. message_id: {message.message_id}, transaction_id: {transaction_id}, res: {res}" ) elif result == TransactionResolution.ROLLBACK: logger.debug( f"{self.__str__()} rollback message. message_id: {message.message_id}, transaction_id: {transaction_id}, res: {res}" ) else: if result == TransactionResolution.COMMIT: raise Exception(f"{self.__str__()} commit message: {message.message_id} raise exception") elif result == TransactionResolution.ROLLBACK: raise Exception(f"{self.__str__()} rollback message: {message.message_id} raise exception") except Exception as e: logger.error(f"server transaction check raise exception, {e}") def __server_transaction_check(self, endpoints, message, transaction_id): try: result = self.__checker.check(message) req = self.__end_transaction_req(message, transaction_id, result, TransactionSource.SOURCE_SERVER_CHECK) future = self.rpc_client.end_transaction_async(endpoints, req, metadata=self._sign(), timeout=self.client_configuration.request_timeout) future.add_done_callback(functools.partial(self.__server_transaction_check_callback, message=message, transaction_id=transaction_id, result=result)) except Exception as e: raise e