def __create_shard_writer()

in datahub/client/producer/shard_group_writer.py [0:0]


    def __create_shard_writer(self, shard_ids):
        with self._lock:
            try:
                shard_meta_map = self._coordinator.meta_data.shard_meta_map
                for shard_id in shard_ids:
                    if shard_id not in self._shard_writer_map:
                        shard_meta = shard_meta_map.get(shard_id)
                        if not shard_meta or shard_meta.shard_state != ShardState.ACTIVE:
                            self._logger.warning("ShardWriter create fail. May the shard is not active. key: {}. shard_id: {}"
                                                 .format(self._coordinator.uniq_key, shard_id))
                            raise DatahubException("ShardWriter create fail. May the shard is not active")
                        self._active_shard.append(shard_id)
                        self._shard_writer_map[shard_id] = ShardWriter(
                            self._coordinator.project_name,
                            self._coordinator.topic_name,
                            self._coordinator.sub_id,
                            self._coordinator.meta_data.message_writer,
                            self._producer_config,
                            shard_id
                        )
                        self._logger.info("ShardWriter create success. key: {}, shard_id: {}".format(self._coordinator.uniq_key, shard_id))
            except Exception as e:
                self._logger.warning("ShardWriter create fail. key: {}. shard_id: {}, {}".format(self._coordinator.uniq_key, shard_id, e))
                raise e