in datahub/client/consumer/shard_group_reader.py [0:0]
def __create_shard_reader(self, shard_ids, timestamp=-1):
with self._lock:
try:
if shard_ids is None or len(shard_ids) == 0:
return
shard_meta_map = self._coordinator.meta_data.shard_meta_map
shards_offset_map = self.__gen_shards_offset(shard_ids, timestamp)
for shard_id in shard_ids:
shard_meta = shard_meta_map.get(shard_id)
if shard_meta is None:
raise InvalidParameterException("Shard not found. key: {}, shard_id: {}".format(self._coordinator.uniq_key, shard_id))
if shard_id in self._shard_reader_map:
continue
consume_offset = shards_offset_map.get(shard_id)
reader = ShardReader(self._coordinator.project_name, self._coordinator.topic_name, self._coordinator.sub_id,
self._coordinator.meta_data.message_reader, shard_id, consume_offset, self._coordinator.fetch_limit)
self._shard_reader_map[shard_id] = reader
self._select_strategy.add_shard(shard_id)
self._logger.info("ShardReader created. key: {}, shard_id: {}, sequence: {}".format(self._coordinator.uniq_key, shard_id, consume_offset.sequence))
except DatahubException as e:
self._logger.warning("ShardReader create fail. key: {}, shard_ids: {}, DatahubException: {}".format(self._coordinator.uniq_key, shard_ids, e))
raise e
except Exception as e:
self._logger.warning("ShardReader create fail. key: {}, shard_ids: {}, {}".format(self._coordinator.uniq_key, shard_ids, e))
raise e