in rocketmq/client.py [0:0]
def __init__(self, group_id, checker_callback, user_args=None, timeout=None, compress_level=None,
max_message_size=None):
super(TransactionMQProducer, self).__init__(group_id, timeout, compress_level, max_message_size)
self._callback_refs = []
def _on_check(producer, c_message, user_data):
exc = None
try:
py_message = ReceivedMessage(c_message)
check_result = checker_callback(py_message)
if check_result != TransactionStatus.UNKNOWN and check_result != TransactionStatus.COMMIT \
and check_result != TransactionStatus.ROLLBACK:
raise ValueError(
'Check transaction status error, please use enum \'TransactionStatus\' as response')
return check_result
except BaseException as e:
exc = e
return TransactionStatus.UNKNOWN
finally:
if exc:
raise exc
transaction_checker_callback = TRANSACTION_CHECK_CALLBACK(_on_check)
self._callback_refs.append(transaction_checker_callback)
self._handle = dll.CreateTransactionProducer(_to_bytes(group_id), transaction_checker_callback, user_args)
if self._handle is None:
raise NullPointerException('Returned null pointer when create transaction producer')
if timeout is not None:
self.set_timeout(timeout)
if compress_level is not None:
self.set_compress_level(compress_level)
if max_message_size is not None:
self.set_max_message_size(max_message_size)