in rocketmq/client.py [0:0]
def send_message_in_transaction(self, message, local_execute, user_args=None):
def _on_local_execute(producer, c_message, usr_args):
exc = None
try:
py_message = ReceivedMessage(c_message)
local_result = local_execute(py_message, usr_args)
if local_result != TransactionStatus.UNKNOWN and local_result != TransactionStatus.COMMIT \
and local_result != TransactionStatus.ROLLBACK:
raise ValueError(
'Local transaction status error, please use enum \'TransactionStatus\' as response')
return local_result
except BaseException as e:
exc = e
return TransactionStatus.UNKNOWN
finally:
if exc:
raise exc
local_execute_callback = LOCAL_TRANSACTION_EXECUTE_CALLBACK(_on_local_execute)
self._callback_refs.append(local_execute_callback)
result = _CSendResult()
try:
ffi_check(
dll.SendMessageTransaction(self._handle,
message,
local_execute_callback,
user_args,
ctypes.pointer(result)))
finally:
self._callback_refs.remove(local_execute_callback)
return SendResult(
SendStatus(result.sendStatus),
result.msgId.decode('utf-8'),
result.offset
)