in datahub/client/consumer/message_reader.py [0:0]
def get_cursor(self, shard_id, offset):
if offset.next_cursor:
return offset.next_cursor
cursor_type, parm = CursorType.OLDEST, -1
cursor = None
if offset.sequence != -1:
cursor_type, parm = CursorType.SEQUENCE, offset.sequence
cursor = self.__get_cursor_once(shard_id, cursor_type, parm)
if not cursor and offset.timestamp != -1:
cursor_type, parm = CursorType.SYSTEM_TIME, offset.timestamp
cursor = self.__get_cursor_once(shard_id, cursor_type, parm)
if not cursor:
cursor_type, parm = CursorType.OLDEST, -1
cursor = self.__get_cursor_once(shard_id, cursor_type, parm)
if not cursor:
self._logger.warning("Init cursor failed. key: {}, shard_id: {}, cursor type: {}, parm: {}"
.format(self._meta_data.class_key, shard_id, cursor_type, parm))
raise DatahubException("Get cursor fail. key: {}, shard_id: {}".format(self._meta_data.class_key, shard_id))
self._logger.info("Init cursor success. key: {}, shard_id: {}, cursor type: {}, parm: {}, cursor: {}"
.format(self._meta_data.class_key, shard_id, cursor_type, parm, cursor))
return cursor