in datahub/client/consumer/shard_group_reader.py [0:0]
def __read_by_reader(self, reader):
record = None
try:
record = reader.read(1)
self._select_strategy.after_read(reader.shard_id, record)
if record:
self._coordinator.send_record_offset(record.record_key)
if self._coordinator.auto_ack_offset:
record.record_key.ack()
except ShardSealedException as e: # error_code: 'InvalidShardOperation'
self._logger.warning("Read fail. Shard read end. shard_id: {}, key: {}, {}"
.format(reader.shard_id, self._coordinator.uniq_key, e))
self._coordinator.on_shard_read_end([reader.shard_id])
except InvalidCursorException as e: # error_code: 'InvalidCursor'
self._logger.warning("Read fail. Invalid cursor. shard_id: {}, key: {}, {}"
.format(reader.shard_id, self._coordinator.uniq_key, e))
reader.reset_offset()
except DatahubException as e:
self._logger.warning("Read fail. shard_id: {}, key: {}. DatahubException: {}"
.format(reader.shard_id, self._coordinator.uniq_key, e))
raise e
except Exception as e:
self._logger.warning("Read fail. shard_id: {}, key: {}. Exception: {}"
.format(reader.shard_id, self._coordinator.uniq_key, e))
raise e
return record