long runCycle()

in hollow/src/main/java/com/netflix/hollow/api/producer/AbstractHollowProducer.java [381:466]


    long runCycle(
            ProducerListeners listeners,
            HollowProducer.Incremental.IncrementalPopulator incrementalPopulator, HollowProducer.Populator populator,
            Status.StageWithStateBuilder cycleStatus, long toVersion) {
        // 1. Begin a new cycle
        Artifacts artifacts = new Artifacts();
        HollowWriteStateEngine writeEngine = getWriteEngine();

        try {
            // 1a. Prepare the write state
            writeEngine.prepareForNextCycle();

            // save timestamp in ms of when cycle starts
            writeEngine.addHeaderTag(HollowStateEngine.HEADER_TAG_METRIC_CYCLE_START, String.valueOf(System.currentTimeMillis()));

            // 2. Populate the state
            populate(listeners, incrementalPopulator, populator, toVersion);

            // 3. Produce a new state if there's work to do
            if (writeEngine.hasChangedSinceLastCycle()) {
                boolean schemaChangedFromPriorVersion = readStates.hasCurrent() &&
                        !writeEngine.hasIdenticalSchemas(readStates.current().getStateEngine());
                updateHeaderTags(writeEngine, toVersion, schemaChangedFromPriorVersion);

                if (allowTypeResharding && forceCoverageOfTypeResharding) {
                    int randomFactor = (int) Math.pow(2, (int) (Math.random() * 9) - 4); // random power of 2 in the range [-4, 4]
                    long adjustedShardSize = targetMaxTypeShardSize * randomFactor;
                    writeEngine.setTargetMaxTypeShardSize(adjustedShardSize);
                }

                // 3a. Publish, run checks & validation, then announce new state consumers
                publish(listeners, toVersion, artifacts);

                ReadStateHelper candidate = readStates.roundtrip(toVersion);
                cycleStatus.readState(candidate.pending());
                candidate = doIntegrityCheck ? 
                        checkIntegrity(listeners, candidate, artifacts, schemaChangedFromPriorVersion) :
                            noIntegrityCheck(candidate, artifacts);

                try {
                    validate(listeners, candidate.pending());
                    announce(listeners, candidate.pending());

                    readStates = candidate.commit();
                    cycleStatus.readState(readStates.current()).success();
                } catch (Throwable th) {
                    if (artifacts.hasReverseDelta()) {
                        applyDelta(artifacts.reverseDelta, candidate.pending().getStateEngine());
                        readStates = candidate.rollback();
                    }
                    throw th;
                }
                lastSuccessfulCycle = toVersion;
            } else {
                // 3b. Nothing to do; reset the effects of Step 2
                // Return the lastSucessfulCycle to the caller thereby
                // the callee can track that version against consumers
                // without having to listen to events.
                // Consistently report the version that would be used if
                // data had been published for the events.  This
                // is for consistency in tracking
                writeEngine.resetToLastPrepareForNextCycle();
                cycleStatus.success();
                listeners.fireNoDelta(toVersion);

                log.info("Populate stage completed with no delta in output state; skipping publish, announce, etc.");
            }
        } catch (Throwable th) {
            try {
                writeEngine.resetToLastPrepareForNextCycle();
            } catch (Throwable innerTh) {
                log.log(Level.SEVERE, "resetToLastPrepareForNextCycle encountered an exception when attempting recovery:", innerTh);
                // swallow the inner throwable to preserve the original
            }
            cycleStatus.fail(th);

            if (th instanceof RuntimeException) {
                throw (RuntimeException) th;
            }
            throw new RuntimeException(th);
        } finally {
            artifacts.cleanup();
            cycleCountSincePrimaryStatus ++;
        }
        return lastSuccessfulCycle;
    }