in src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistryImpl.java [299:406]
public synchronized long registerAnnouncement(final Announcement topologyAnnouncement) {
if (topologyAnnouncement==null) {
throw new IllegalArgumentException("topologyAnnouncement must not be null");
}
if (!topologyAnnouncement.isValid()) {
logger.warn("topologyAnnouncement is not valid");
return -1;
}
if (resourceResolverFactory == null) {
logger.error("registerAnnouncement: resourceResolverFactory is null");
return -1;
}
final CachedAnnouncement cachedAnnouncement =
ownAnnouncementsCache.get(topologyAnnouncement.getOwnerId());
if (cachedAnnouncement!=null) {
if (logger.isDebugEnabled()) {
logger.debug("registerAnnouncement: got existing cached announcement for ownerId="+topologyAnnouncement.getOwnerId());
}
try{
if (topologyAnnouncement.correspondsTo(cachedAnnouncement.getAnnouncement())) {
// then nothing has changed with this announcement, so just update
// the heartbeat and fine is.
// this should actually be the normal case for a stable connector
logger.debug("registerAnnouncement: nothing has changed, only updating heartbeat in-memory.");
return cachedAnnouncement.registerPing(topologyAnnouncement, config);
}
logger.debug("registerAnnouncement: incoming announcement differs from existing one!");
} catch(JsonException e) {
logger.error("registerAnnouncement: got JSONException while converting incoming announcement to JSON: "+e, e);
}
// otherwise the repository and the cache require to be updated
// resetting the cache therefore at this point already
ownAnnouncementsCache.remove(topologyAnnouncement.getOwnerId());
} else {
logger.debug("registerAnnouncement: no cached announcement yet for ownerId="+topologyAnnouncement.getOwnerId());
}
logger.debug("registerAnnouncement: getting the list of all local announcements");
final Collection<Announcement> announcements = new LinkedList<>();
fillWithCachedAnnouncements(announcements);
if (logger.isDebugEnabled()) {
logger.debug("registerAnnouncement: list returned: "+(announcements==null ? "null" : announcements.size()));
}
for (Iterator<Announcement> it1 = announcements.iterator(); it1
.hasNext();) {
Announcement announcement = it1.next();
if (announcement.getOwnerId().equals(
topologyAnnouncement.getOwnerId())) {
// then this is from the same owner - skip this
continue;
}
// analyse to see if any of the instances in the announcement
// include the new owner
Collection<InstanceDescription> attachedInstances = announcement
.listInstances();
for (Iterator<InstanceDescription> it2 = attachedInstances
.iterator(); it2.hasNext();) {
InstanceDescription instanceDescription = it2.next();
if (topologyAnnouncement.getOwnerId().equals(
instanceDescription.getSlingId())) {
logger.info("registerAnnouncement: already have this instance attached: "
+ instanceDescription.getSlingId());
return -1;
}
}
}
ResourceResolver resourceResolver = null;
try {
resourceResolver = resourceResolverFactory
.getServiceResourceResolver(null);
final Resource announcementsResource = ResourceHelper
.getOrCreateResource(
resourceResolver,
config.getClusterInstancesPath()
+ "/"
+ slingId
+ "/announcements");
topologyAnnouncement.persistTo(announcementsResource);
resourceResolver.commit();
ownAnnouncementsCache.put(topologyAnnouncement.getOwnerId(),
new CachedAnnouncement(topologyAnnouncement, config));
} catch (LoginException e) {
logger.error(
"registerAnnouncement: could not log in administratively: "
+ e, e);
throw new RuntimeException("Could not log in to repository (" + e
+ ")", e);
} catch (PersistenceException e) {
logger.error("registerAnnouncement: got a PersistenceException: "
+ e, e);
throw new RuntimeException(
"Exception while talking to repository (" + e + ")", e);
} catch (JsonException e) {
logger.error("registerAnnouncement: got a JSONException: " + e, e);
throw new RuntimeException("Exception while converting json (" + e
+ ")", e);
} finally {
if (resourceResolver != null) {
resourceResolver.close();
}
}
return 0;
}