in datahub/client/consumer/message_reader.py [0:0]
def get_records(self, shard_id, cursor, fetch_limit):
topic_meta = self._meta_data.topic_meta
datahub_client = self._meta_data.datahub_client
if topic_meta.record_type == RecordType.TUPLE:
try:
return datahub_client.get_tuple_records(topic_meta.project_name, topic_meta.topic_name, shard_id,
topic_meta.record_schema, cursor, fetch_limit)
except DatahubException as e:
self._logger.warning("Get TUPLE record fail. shard_id: {}, cursor: {}, DatahubException: {}".format(shard_id, cursor, e))
raise e
except Exception as e:
self._logger.warning("Get TUPLE record fail. shard_id: {}, cursor: {}, {}".format(shard_id, cursor, e))
raise e
elif topic_meta.record_type == RecordType.BLOB:
try:
return datahub_client.get_blob_records(topic_meta.project_name, topic_meta.topic_name, shard_id,
cursor, fetch_limit)
except DatahubException as e:
self._logger.warning("Get BLOB record fail. shard_id: {}, cursor: {}, DatahubException: {}".format(shard_id, cursor, e))
raise e
except Exception as e:
self._logger.warning("Get BLOB record fail. shard_id: {}, cursor: {}, {}".format(shard_id, cursor, e))
raise e
else:
self._logger.warning("Invalid record type, should be TUPLE or BLOB!")
raise InvalidParameterException("Invalid record type")