in core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java [461:570]
protected void persistNowInternal(boolean alreadyHasMutex) {
if (!isActive() && state != ListenerState.STOPPING) {
return;
}
try {
if (!alreadyHasMutex) persistingMutex.acquire();
if (!isActive() && state != ListenerState.STOPPING) return;
// Writes to the datastore are lossy. We'll just log failures and move on.
// (Most) entities will get updated multiple times in their lifecycle
// so not a huge deal. planeId does not get updated so if the first
// write fails it's not available to the HA cluster at all. That's why it
// gets periodically written to the datastore.
updatePlaneIdIfTimedOut();
// Atomically switch the delta, so subsequent modifications will be done in the
// next scheduled persist
DeltaCollector prevDeltaCollector;
synchronized (this) {
prevDeltaCollector = deltaCollector;
deltaCollector = new DeltaCollector();
}
if (LOG.isDebugEnabled() && shouldLogCheckpoint()) LOG.debug("Checkpointing delta of memento: "
+ "updating entities={}, locations={}, policies={}, enrichers={}, catalog items={}, bundles={}; "
+ "removing entities={}, locations={}, policies={}, enrichers={}, catalog items={}, bundles={}",
new Object[] {
limitedCountString(prevDeltaCollector.entities), limitedCountString(prevDeltaCollector.locations), limitedCountString(prevDeltaCollector.policies), limitedCountString(prevDeltaCollector.enrichers), limitedCountString(prevDeltaCollector.catalogItems), limitedCountString(prevDeltaCollector.bundles),
limitedCountString(prevDeltaCollector.removedEntityIds), limitedCountString(prevDeltaCollector.removedLocationIds), limitedCountString(prevDeltaCollector.removedPolicyIds), limitedCountString(prevDeltaCollector.removedEnricherIds), limitedCountString(prevDeltaCollector.removedCatalogItemIds), limitedCountString(prevDeltaCollector.removedBundleIds)});
if (rePersistReferencedObjectsEnabled) {
addReferencedObjects(prevDeltaCollector);
if (LOG.isTraceEnabled()) LOG.trace("Checkpointing delta of memento with references: "
+ "updating {} entities, {} locations, {} policies, {} enrichers, {} catalog items, {} bundles; "
+ "removing {} entities, {} locations, {} policies, {} enrichers, {} catalog items, {} bundles",
new Object[] {
prevDeltaCollector.entities.size(), prevDeltaCollector.locations.size(), prevDeltaCollector.policies.size(), prevDeltaCollector.enrichers.size(), prevDeltaCollector.catalogItems.size(), prevDeltaCollector.bundles.size(),
prevDeltaCollector.removedEntityIds.size(), prevDeltaCollector.removedLocationIds.size(), prevDeltaCollector.removedPolicyIds.size(), prevDeltaCollector.removedEnricherIds.size(), prevDeltaCollector.removedCatalogItemIds.size(), prevDeltaCollector.removedBundleIds.size()});
}
// Generate mementos for everything that has changed in this time period
if (prevDeltaCollector.isEmpty()) {
if (LOG.isTraceEnabled()) LOG.trace("No changes to persist since last delta");
} else {
PersisterDeltaImpl persisterDelta = new PersisterDeltaImpl();
if (prevDeltaCollector.planeId != null) {
persisterDelta.planeId = prevDeltaCollector.planeId;
}
for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) {
for (BrooklynObject instance: prevDeltaCollector.getCollectionOfType(type)) {
try {
String skip = null;
if (instance instanceof Entity) {
if (Entities.isUnmanagingOrNoLongerManaged((Entity)instance)) skip = "unmanaging or no longer managed";
} else if (instance instanceof Location) {
if (!Locations.isManaged((Location)instance)) skip = "not managed";
} else if (instance instanceof EntityAdjunct) {
Maybe<Entity> entity = EntityAdjuncts.getEntity((EntityAdjunct) instance, false);
if (entity.isAbsent()) skip = "not assigned to any entity";
// if null, means adjunct doesn't tell us, which is weird, but don't skip
else if (entity.isPresentAndNonNull()) {
if (Entities.isUnmanagingOrNoLongerManaged(entity.get())) skip = "associated entity is unmanaging or no longer managed";
}
}
if (skip!=null) {
// not uncommon if deleting lots of things
LOG.debug("Persistence skipping change to "+instance+" because "+skip+"; expect this to be unpersisted soon");
continue;
} else {
persisterDelta.add(type, ((BrooklynObjectInternal) instance).getRebindSupport().getMemento());
}
} catch (Exception e) {
exceptionHandler.onGenerateMementoFailed(type, instance, e);
}
}
}
for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) {
persisterDelta.removed(type, prevDeltaCollector.getRemovedIdsOfType(type));
}
/*
* Need to guarantee "happens before", with any thread that subsequently reads
* the mementos.
*
* See MementoFileWriter.writeNow for the corresponding synchronization,
* that guarantees its thread has values visible for reads.
*/
synchronized (new Object()) {}
// Tell the persister to persist it
persister.delta(persisterDelta, exceptionHandler);
}
} catch (Exception e) {
if (isActive()) {
throw Exceptions.propagate(e);
} else {
Exceptions.propagateIfFatal(e);
LOG.debug("Problem persisting, but no longer active (ignoring)", e);
}
} finally {
synchronized (writeCount) {
writeCount.incrementAndGet();
writeCount.notifyAll();
}
if (!alreadyHasMutex) persistingMutex.release();
}
}