def __create_shard_reader()

in datahub/client/consumer/shard_group_reader.py [0:0]


    def __create_shard_reader(self, shard_ids, timestamp=-1):
        with self._lock:
            try:
                if shard_ids is None or len(shard_ids) == 0:
                    return
                shard_meta_map = self._coordinator.meta_data.shard_meta_map
                shards_offset_map = self.__gen_shards_offset(shard_ids, timestamp)
                for shard_id in shard_ids:
                    shard_meta = shard_meta_map.get(shard_id)
                    if shard_meta is None:
                        raise InvalidParameterException("Shard not found. key: {}, shard_id: {}".format(self._coordinator.uniq_key, shard_id))
                    if shard_id in self._shard_reader_map:
                        continue
                    consume_offset = shards_offset_map.get(shard_id)
                    reader = ShardReader(self._coordinator.project_name, self._coordinator.topic_name, self._coordinator.sub_id,
                                         self._coordinator.meta_data.message_reader, shard_id, consume_offset, self._coordinator.fetch_limit)
                    self._shard_reader_map[shard_id] = reader
                    self._select_strategy.add_shard(shard_id)
                    self._logger.info("ShardReader created. key: {}, shard_id: {}, sequence: {}".format(self._coordinator.uniq_key, shard_id, consume_offset.sequence))
            except DatahubException as e:
                self._logger.warning("ShardReader create fail. key: {}, shard_ids: {}, DatahubException: {}".format(self._coordinator.uniq_key, shard_ids, e))
                raise e
            except Exception as e:
                self._logger.warning("ShardReader create fail. key: {}, shard_ids: {}, {}".format(self._coordinator.uniq_key, shard_ids, e))
                raise e