def __read_by_reader()

in datahub/client/consumer/shard_group_reader.py [0:0]


    def __read_by_reader(self, reader):
        record = None
        try:
            record = reader.read(1)
            self._select_strategy.after_read(reader.shard_id, record)
            if record:
                self._coordinator.send_record_offset(record.record_key)
                if self._coordinator.auto_ack_offset:
                    record.record_key.ack()
        except ShardSealedException as e:  # error_code: 'InvalidShardOperation'
            self._logger.warning("Read fail. Shard read end. shard_id: {}, key: {}, {}"
                                 .format(reader.shard_id, self._coordinator.uniq_key, e))
            self._coordinator.on_shard_read_end([reader.shard_id])
        except InvalidCursorException as e:  # error_code: 'InvalidCursor'
            self._logger.warning("Read fail. Invalid cursor. shard_id: {}, key: {}, {}"
                                 .format(reader.shard_id, self._coordinator.uniq_key, e))
            reader.reset_offset()
        except DatahubException as e:
            self._logger.warning("Read fail. shard_id: {}, key: {}. DatahubException: {}"
                                 .format(reader.shard_id, self._coordinator.uniq_key, e))
            raise e
        except Exception as e:
            self._logger.warning("Read fail. shard_id: {}, key: {}. Exception: {}"
                                 .format(reader.shard_id, self._coordinator.uniq_key, e))
            raise e
        return record