def get_records()

in datahub/client/consumer/message_reader.py [0:0]


    def get_records(self, shard_id, cursor, fetch_limit):
        topic_meta = self._meta_data.topic_meta
        datahub_client = self._meta_data.datahub_client

        if topic_meta.record_type == RecordType.TUPLE:
            try:
                return datahub_client.get_tuple_records(topic_meta.project_name, topic_meta.topic_name, shard_id,
                                                        topic_meta.record_schema, cursor, fetch_limit)
            except DatahubException as e:
                self._logger.warning("Get TUPLE record fail. shard_id: {}, cursor: {}, DatahubException: {}".format(shard_id, cursor, e))
                raise e
            except Exception as e:
                self._logger.warning("Get TUPLE record fail. shard_id: {}, cursor: {}, {}".format(shard_id, cursor, e))
                raise e
        elif topic_meta.record_type == RecordType.BLOB:
            try:
                return datahub_client.get_blob_records(topic_meta.project_name, topic_meta.topic_name, shard_id,
                                                       cursor, fetch_limit)
            except DatahubException as e:
                self._logger.warning("Get BLOB record fail. shard_id: {}, cursor: {}, DatahubException: {}".format(shard_id, cursor, e))
                raise e
            except Exception as e:
                self._logger.warning("Get BLOB record fail. shard_id: {}, cursor: {}, {}".format(shard_id, cursor, e))
                raise e
        else:
            self._logger.warning("Invalid record type, should be TUPLE or BLOB!")
            raise InvalidParameterException("Invalid record type")