in nifi-extension-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java [602:808]
public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException {
Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;
if (lastListedLatestEntryTimestampMillis == null || lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
try {
boolean noUpdateRequired = false;
// Attempt to retrieve state from the state manager if a last listing was not yet established or
// if just elected the primary node
final StateMap stateMap = session.getState(getStateScope(context));
latestIdentifiersProcessed.clear();
for (final Map.Entry<String, String> state : stateMap.toMap().entrySet()) {
final String k = state.getKey();
final String v = state.getValue();
if (v == null || v.isEmpty()) {
continue;
}
if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
minTimestampToListMillis = Long.parseLong(v);
// If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
if (minTimestampToListMillis.equals(lastListedLatestEntryTimestampMillis)) {
noUpdateRequired = true;
} else {
lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
}
} else if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
} else if (k.startsWith(IDENTIFIER_PREFIX)) {
latestIdentifiersProcessed.add(v);
}
}
justElectedPrimaryNode = false;
if (noUpdateRequired) {
getLogger().debug("No update required for last listed entity: yielding");
context.yield();
return;
}
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
context.yield();
return;
}
}
final List<T> entityList;
final long currentRunTimeNanos = getCurrentNanoTime();
final long currentRunTimeMillis = getCurrentTime();
try {
// track of when this last executed for consideration of the lag nanos
entityList = performListing(context, minTimestampToListMillis, ListingMode.EXECUTION);
} catch (final IOException e) {
getLogger().error("Failed to perform listing on remote host due to {}", e.getMessage(), e);
context.yield();
return;
}
if (entityList == null || entityList.isEmpty()) {
getLogger().debug("No data found matching minimum timestamp [{}]: yielding", minTimestampToListMillis);
context.yield();
return;
}
Long latestListedEntryTimestampThisCycleMillis = null;
final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
// Build a sorted map to determine the latest possible entries
boolean targetSystemHasMilliseconds = false;
boolean targetSystemHasSeconds = false;
for (final T entity : entityList) {
final long entityTimestampMillis = entity.getTimestamp();
if (!targetSystemHasMilliseconds) {
targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0;
}
if (!targetSystemHasSeconds) {
targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
}
// New entries are all those that occur at or after the associated timestamp
final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis >= lastProcessedLatestEntryTimestampMillis;
if (newEntry) {
List<T> entitiesForTimestamp = orderedEntries.computeIfAbsent(entity.getTimestamp(), k -> new ArrayList<>());
entitiesForTimestamp.add(entity);
}
}
int entitiesListed = 0;
if (orderedEntries.size() > 0) {
latestListedEntryTimestampThisCycleMillis = orderedEntries.lastKey();
// Determine target system time precision.
String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
if (StringUtils.isBlank(specifiedPrecision)) {
// If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by the Processor, then specifiedPrecision can be null, instead of its default value.
specifiedPrecision = getDefaultTimePrecision();
}
final TimeUnit targetSystemTimePrecision
= PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
: PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS
: PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
// If the last listing time is equal to the newest entries previously seen,
// another iteration has occurred without new files and special handling is needed to avoid starvation
if (latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis)) {
/* We need to wait for another cycle when either:
* - If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run
* - The latest listed entity timestamp is equal to the last processed time, meaning we handled those items originally passed over. No need to process it again.
*/
final long listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
final boolean minimalListingLagNotPassed = currentRunTimeNanos - lastRunTimeNanos < listingLagNanos;
if (minimalListingLagNotPassed) {
getLogger().debug("Minimal listing lag not passed: yielding");
context.yield();
return;
}
final boolean latestListedEntryIsUpToDate = latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
&& orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream().allMatch(entity -> latestIdentifiersProcessed.contains(entity.getIdentifier()));
if (latestListedEntryIsUpToDate) {
getLogger().debug("Latest entry already listed with timestamp [{}]: yielding", latestListedEntryTimestampThisCycleMillis);
context.yield();
return;
}
} else {
// Convert minimum reliable timestamp into target system time unit, in order to truncate unreliable digits.
final long minimumReliableTimestampInFilesystemTimeUnit = targetSystemTimePrecision.convert(currentRunTimeMillis - listingLagMillis, TimeUnit.MILLISECONDS);
final long minimumReliableTimestampMillis = targetSystemTimePrecision.toMillis(minimumReliableTimestampInFilesystemTimeUnit);
// If the latest listed entity is not old enough, compared with the minimum timestamp, then wait for another cycle.
// The minimum timestamp should be reliable to determine that no further entries will be added with the same timestamp based on the target system time precision.
if (minimumReliableTimestampMillis < latestListedEntryTimestampThisCycleMillis) {
// Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
orderedEntries.remove(latestListedEntryTimestampThisCycleMillis);
}
}
final boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
if (writerSet) {
try {
entitiesListed = createRecordsForEntities(context, session, orderedEntries);
} catch (final IOException | SchemaNotFoundException e) {
getLogger().error("Failed to write listing to FlowFile", e);
context.yield();
return;
}
} else {
entitiesListed = createFlowFilesForEntities(context, session, orderedEntries);
}
}
// As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
if (latestListedEntryTimestampThisCycleMillis != null) {
final boolean processedNewFiles = entitiesListed > 0;
if (processedNewFiles) {
// If there have been files created, update the last timestamp we processed.
// Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
// because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
// If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
// If it didn't change, we need to add identifiers.
latestIdentifiersProcessed.clear();
}
// Capture latestIdentifierProcessed.
latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
}
if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
// We have performed a listing and pushed any FlowFiles out that may have been generated
// Now, we need to persist state about the Last Modified timestamp of the newest file
// that we evaluated. We do this in order to avoid pulling in the same file twice.
// However, we want to save the state both locally and remotely.
// We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
// previously Primary Node left off.
// We also store the state locally so that if the node is restarted, and the node cannot contact
// the distributed state cache, the node can continue to run (if it is primary node).
try {
lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, latestIdentifiersProcessed, session, getStateScope(context));
} catch (final IOException ioe) {
getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
+ "if another node begins executing this Processor, data duplication may occur.", ioe);
}
}
if (processedNewFiles) {
getLogger().info("Successfully created listing with {} new objects", entitiesListed);
session.commitAsync();
}
lastRunTimeNanos = currentRunTimeNanos;
} else {
getLogger().debug("There is no data to list: yielding");
context.yield();
// lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
if (lastListedLatestEntryTimestampMillis == null) {
lastListedLatestEntryTimestampMillis = 0L;
}
}
}