public void listByTrackingTimestamps()

in nifi-extension-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java [602:808]


    public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException {
        Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;

        if (lastListedLatestEntryTimestampMillis == null || lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
            try {
                boolean noUpdateRequired = false;
                // Attempt to retrieve state from the state manager if a last listing was not yet established or
                // if just elected the primary node
                final StateMap stateMap = session.getState(getStateScope(context));
                latestIdentifiersProcessed.clear();
                for (final Map.Entry<String, String> state : stateMap.toMap().entrySet()) {
                    final String k = state.getKey();
                    final String v = state.getValue();
                    if (v == null || v.isEmpty()) {
                        continue;
                    }

                    if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
                        minTimestampToListMillis = Long.parseLong(v);
                        // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
                        if (minTimestampToListMillis.equals(lastListedLatestEntryTimestampMillis)) {
                            noUpdateRequired = true;
                        } else {
                            lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
                        }
                    } else if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
                        lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
                    } else if (k.startsWith(IDENTIFIER_PREFIX)) {
                        latestIdentifiersProcessed.add(v);
                    }
                }
                justElectedPrimaryNode = false;
                if (noUpdateRequired) {
                    getLogger().debug("No update required for last listed entity: yielding");
                    context.yield();
                    return;
                }
            } catch (final IOException ioe) {
                getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
                context.yield();
                return;
            }
        }

        final List<T> entityList;
        final long currentRunTimeNanos = getCurrentNanoTime();
        final long currentRunTimeMillis = getCurrentTime();
        try {
            // track of when this last executed for consideration of the lag nanos
            entityList = performListing(context, minTimestampToListMillis, ListingMode.EXECUTION);
        } catch (final IOException e) {
            getLogger().error("Failed to perform listing on remote host due to {}", e.getMessage(), e);
            context.yield();
            return;
        }

        if (entityList == null || entityList.isEmpty()) {
            getLogger().debug("No data found matching minimum timestamp [{}]: yielding", minTimestampToListMillis);
            context.yield();
            return;
        }

        Long latestListedEntryTimestampThisCycleMillis = null;
        final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();

        // Build a sorted map to determine the latest possible entries
        boolean targetSystemHasMilliseconds = false;
        boolean targetSystemHasSeconds = false;
        for (final T entity : entityList) {
            final long entityTimestampMillis = entity.getTimestamp();
            if (!targetSystemHasMilliseconds) {
                targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0;
            }
            if (!targetSystemHasSeconds) {
                targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
            }
            // New entries are all those that occur at or after the associated timestamp
            final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis >= lastProcessedLatestEntryTimestampMillis;

            if (newEntry) {
                List<T> entitiesForTimestamp = orderedEntries.computeIfAbsent(entity.getTimestamp(), k -> new ArrayList<>());
                entitiesForTimestamp.add(entity);
            }
        }

        int entitiesListed = 0;

        if (orderedEntries.size() > 0) {
            latestListedEntryTimestampThisCycleMillis = orderedEntries.lastKey();

            // Determine target system time precision.
            String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
            if (StringUtils.isBlank(specifiedPrecision)) {
                // If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by the Processor, then specifiedPrecision can be null, instead of its default value.
                specifiedPrecision = getDefaultTimePrecision();
            }
            final TimeUnit targetSystemTimePrecision
                    = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
                    ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
                    : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS
                    : PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
            final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision);

            // If the last listing time is equal to the newest entries previously seen,
            // another iteration has occurred without new files and special handling is needed to avoid starvation
            if (latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis)) {
                /* We need to wait for another cycle when either:
                 *   - If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run
                 *   - The latest listed entity timestamp is equal to the last processed time, meaning we handled those items originally passed over. No need to process it again.
                 */
                final long listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
                final boolean minimalListingLagNotPassed = currentRunTimeNanos - lastRunTimeNanos < listingLagNanos;

                if (minimalListingLagNotPassed) {
                    getLogger().debug("Minimal listing lag not passed: yielding");
                    context.yield();
                    return;
                }

                final boolean latestListedEntryIsUpToDate = latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
                        && orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream().allMatch(entity -> latestIdentifiersProcessed.contains(entity.getIdentifier()));

                if (latestListedEntryIsUpToDate) {
                    getLogger().debug("Latest entry already listed with timestamp [{}]: yielding", latestListedEntryTimestampThisCycleMillis);
                    context.yield();
                    return;
                }

            } else {
                // Convert minimum reliable timestamp into target system time unit, in order to truncate unreliable digits.
                final long minimumReliableTimestampInFilesystemTimeUnit = targetSystemTimePrecision.convert(currentRunTimeMillis - listingLagMillis, TimeUnit.MILLISECONDS);
                final long minimumReliableTimestampMillis = targetSystemTimePrecision.toMillis(minimumReliableTimestampInFilesystemTimeUnit);
                // If the latest listed entity is not old enough, compared with the minimum timestamp, then wait for another cycle.
                // The minimum timestamp should be reliable to determine that no further entries will be added with the same timestamp based on the target system time precision.
                if (minimumReliableTimestampMillis < latestListedEntryTimestampThisCycleMillis) {
                    // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
                    orderedEntries.remove(latestListedEntryTimestampThisCycleMillis);
                }
            }


            final boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
            if (writerSet) {
                try {
                    entitiesListed = createRecordsForEntities(context, session, orderedEntries);
                } catch (final IOException | SchemaNotFoundException e) {
                    getLogger().error("Failed to write listing to FlowFile", e);
                    context.yield();
                    return;
                }
            } else {
                entitiesListed = createFlowFilesForEntities(context, session, orderedEntries);
            }
        }

        // As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
        if (latestListedEntryTimestampThisCycleMillis != null) {
            final boolean processedNewFiles = entitiesListed > 0;

            if (processedNewFiles) {
                // If there have been files created, update the last timestamp we processed.
                // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
                // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
                if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
                    // If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
                    // If it didn't change, we need to add identifiers.
                    latestIdentifiersProcessed.clear();
                }
                // Capture latestIdentifierProcessed.
                latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
                lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
            }

            if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
                // We have performed a listing and pushed any FlowFiles out that may have been generated
                // Now, we need to persist state about the Last Modified timestamp of the newest file
                // that we evaluated. We do this in order to avoid pulling in the same file twice.
                // However, we want to save the state both locally and remotely.
                // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
                // previously Primary Node left off.
                // We also store the state locally so that if the node is restarted, and the node cannot contact
                // the distributed state cache, the node can continue to run (if it is primary node).
                try {
                    lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
                    persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, latestIdentifiersProcessed, session, getStateScope(context));
                } catch (final IOException ioe) {
                    getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
                            + "if another node begins executing this Processor, data duplication may occur.", ioe);
                }
            }

            if (processedNewFiles) {
                getLogger().info("Successfully created listing with {} new objects", entitiesListed);
                session.commitAsync();
            }

            lastRunTimeNanos = currentRunTimeNanos;
        } else {
            getLogger().debug("There is no data to list: yielding");
            context.yield();

            // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
            if (lastListedLatestEntryTimestampMillis == null) {
                lastListedLatestEntryTimestampMillis = 0L;
            }
        }
    }