in hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java [381:466]
long runCycle(
ProducerListeners listeners,
HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator,
Status.StageWithStateBuilder cycleStatus, long toVersion) {
// 1. Begin a new cycle
Artifacts artifacts = new Artifacts();
HollowWriteStateEngine writeEngine = getWriteEngine();
try {
// 1a. Prepare the write state
writeEngine.prepareForNextCycle();
// save timestamp in ms of when cycle starts
writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START, String.valueOf(System.currentTimeMillis()));
// 2. Populate the state
populate(listeners, incrementalPopulator, populator, toVersion);
// 3. Produce a new state if there's work to do
if (writeEngine.hasChangedSinceLastCycle()) {
boolean schemaChangedFromPriorVersion = readStates.hasCurrent() &&
!writeEngine.hasIdenticalSchemas(readStates.current().getStateEngine());
updateHeaderTags(writeEngine, toVersion, schemaChangedFromPriorVersion);
if (allowTypeResharding && forceCoverageOfTypeResharding) {
int randomFactor = (int) Math.pow(2, (int) (Math.random() * 9) - 4); // random power of 2 in the range [-4, 4]
long adjustedShardSize = targetMaxTypeShardSize * randomFactor;
writeEngine.setTargetMaxTypeShardSize(adjustedShardSize);
}
// 3a. Publish, run checks & validation, then announce new state consumers
publish(listeners, toVersion, artifacts);
ReadStateHelper candidate = readStates.roundtrip(toVersion);
cycleStatus.readState(candidate.pending());
candidate = doIntegrityCheck ?
checkIntegrity(listeners, candidate, artifacts, schemaChangedFromPriorVersion) :
noIntegrityCheck(candidate, artifacts);
try {
validate(listeners, candidate.pending());
announce(listeners, candidate.pending());
readStates = candidate.commit();
cycleStatus.readState(readStates.current()).success();
} catch (Throwable th) {
if (artifacts.hasReverseDelta()) {
applyDelta(artifacts.reverseDelta, candidate.pending().getStateEngine());
readStates = candidate.rollback();
}
throw th;
}
lastSuccessfulCycle = toVersion;
} else {
// 3b. Nothing to do; reset the effects of Step 2
// Return the lastSucessfulCycle to the caller thereby
// the callee can track that version against consumers
// without having to listen to events.
// Consistently report the version that would be used if
// data had been published for the events. This
// is for consistency in tracking
writeEngine.resetToLastPrepareForNextCycle();
cycleStatus.success();
listeners.fireNoDelta(toVersion);
log.info("Populate stage completed with no delta in output state; skipping publish, announce, etc.");
}
} catch (Throwable th) {
try {
writeEngine.resetToLastPrepareForNextCycle();
} catch (Throwable innerTh) {
log.log(Level.SEVERE, "resetToLastPrepareForNextCycle encountered an exception when attempting recovery:", innerTh);
// swallow the inner throwable to preserve the original
}
cycleStatus.fail(th);
if (th instanceof RuntimeException) {
throw (RuntimeException) th;
}
throw new RuntimeException(th);
} finally {
artifacts.cleanup();
cycleCountSincePrimaryStatus ++;
}
return lastSuccessfulCycle;
}