in datahub/client/producer/shard_group_writer.py [0:0]
def __create_shard_writer(self, shard_ids):
with self._lock:
try:
shard_meta_map = self._coordinator.meta_data.shard_meta_map
for shard_id in shard_ids:
if shard_id not in self._shard_writer_map:
shard_meta = shard_meta_map.get(shard_id)
if not shard_meta or shard_meta.shard_state != ShardState.ACTIVE:
self._logger.warning("ShardWriter create fail. May the shard is not active. key: {}. shard_id: {}"
.format(self._coordinator.uniq_key, shard_id))
raise DatahubException("ShardWriter create fail. May the shard is not active")
self._active_shard.append(shard_id)
self._shard_writer_map[shard_id] = ShardWriter(
self._coordinator.project_name,
self._coordinator.topic_name,
self._coordinator.sub_id,
self._coordinator.meta_data.message_writer,
self._producer_config,
shard_id
)
self._logger.info("ShardWriter create success. key: {}, shard_id: {}".format(self._coordinator.uniq_key, shard_id))
except Exception as e:
self._logger.warning("ShardWriter create fail. key: {}. shard_id: {}, {}".format(self._coordinator.uniq_key, shard_id, e))
raise e