in rocketmq/client.py [0:0]
def subscribe(self, topic, callback, expression='*'):
def _on_message(consumer, msg):
exc = None
try:
consume_result = callback(ReceivedMessage(msg))
if consume_result != ConsumeStatus.CONSUME_SUCCESS and consume_result != ConsumeStatus.RECONSUME_LATER:
raise ValueError('Consume status error, please use enum \'ConsumeStatus\' as response')
return consume_result
except BaseException as e:
exc = e
return ConsumeStatus.RECONSUME_LATER
finally:
if exc:
raise exc
ffi_check(dll.Subscribe(self._handle, _to_bytes(topic), _to_bytes(expression)))
self._register_callback(_on_message)