in registry/zookeeper/src/lib.rs [266:312]
fn subscribe(&self, url: Url, listener: RegistryNotifyListener) -> Result<(), StdError> {
let service_name = url.get_service_name();
let zk_path = format!("/{}/{}/{}", DUBBO_KEY, &service_name, PROVIDERS_KEY);
if self
.listeners
.read()
.unwrap()
.get(service_name.as_str())
.is_some()
{
return Ok(());
}
self.listeners
.write()
.unwrap()
.insert(service_name.to_string(), listener.clone());
let zk_listener =
self.create_listener(zk_path.clone(), service_name.to_string(), listener.clone());
let zk_changed_paths = self.zk_client.get_children_w(&zk_path, zk_listener);
let result = match zk_changed_paths {
Err(err) => {
error!("zk subscribe error: {}", err);
Vec::new()
}
Ok(urls) => urls
.iter()
.map(|node_key| {
let provider_url: Url = urlencoding::decode(node_key)
.unwrap()
.to_string()
.as_str()
.into();
provider_url
})
.collect(),
};
info!("notifying {}->{:?}", service_name, result);
listener.notify(ServiceEvent {
key: service_name,
action: String::from("ADD"),
service: result,
});
Ok(())
}