public void onTrigger()

in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java [293:534]


    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

        final ComponentLog logger = getLogger();

        // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
        final PropertyValue signalIdProperty = context.getProperty(RELEASE_SIGNAL_IDENTIFIER);
        final Integer bufferCount = context.getProperty(WAIT_BUFFER_COUNT).asInteger();

        final Map<Relationship, List<FlowFile>> processedFlowFiles = new HashMap<>();
        final Function<Relationship, List<FlowFile>> getFlowFilesFor = r -> processedFlowFiles.computeIfAbsent(r, k -> new ArrayList<>());

        final AtomicReference<String> targetSignalId = new AtomicReference<>();
        final AtomicInteger bufferedCount = new AtomicInteger(0);
        final List<FlowFile> failedFilteringFlowFiles = new ArrayList<>();
        final Supplier<FlowFileFilter.FlowFileFilterResult> acceptResultSupplier =
                () -> bufferedCount.incrementAndGet() == bufferCount ? ACCEPT_AND_TERMINATE : ACCEPT_AND_CONTINUE;

        // Clear expired penalties.
        if (!signalIdPenalties.isEmpty()) {
            final Iterator<Entry<String, Long>> penaltyIterator = signalIdPenalties.entrySet().iterator();
            final long now = System.currentTimeMillis();
            while (penaltyIterator.hasNext()) {
                final Entry<String, Long> penalty = penaltyIterator.next();
                if (penalty.getValue() < now) {
                    penaltyIterator.remove();
                }
            }
        }

        final List<FlowFile> flowFiles = session.get(f -> {

            final String fSignalId = signalIdProperty.evaluateAttributeExpressions(f).getValue();

            // if the computed value is null, or empty, we transfer the FlowFile to failure relationship
            if (StringUtils.isBlank(fSignalId)) {
                // We can't penalize f before getting it from session, so keep it in a temporal list.
                logger.error("FlowFile {} has no attribute for given Release Signal Identifier", f);
                failedFilteringFlowFiles.add(f);
                return ACCEPT_AND_CONTINUE;
            }

            if (signalIdPenalties.containsKey(fSignalId)) {
                // This id is penalized.
                return REJECT_AND_CONTINUE;
            }

            final String targetSignalIdStr = targetSignalId.get();
            if (targetSignalIdStr == null) {
                // This is the first one.
                targetSignalId.set(fSignalId);
                return acceptResultSupplier.get();
            }

            if (targetSignalIdStr.equals(fSignalId)) {
                return acceptResultSupplier.get();
            }

            return REJECT_AND_CONTINUE;

        });

        final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
        final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
        final AtomicReference<Signal> signalRef = new AtomicReference<>();
        // This map contains original counts before those are consumed to release incoming FlowFiles.
        final HashMap<String, Long> originalSignalCounts = new HashMap<>();

        final Consumer<FlowFile> transferToFailure = flowFile -> {
            flowFile = session.penalize(flowFile);
            // This flowFile is now failed, our tracking is done, clear the timer
            flowFile = clearWaitState(session, flowFile);
            getFlowFilesFor.apply(REL_FAILURE).add(flowFile);
        };

        final Consumer<Entry<Relationship, List<FlowFile>>> transferFlowFiles = routedFlowFiles -> {
            Relationship relationship = routedFlowFiles.getKey();

            if (REL_WAIT.equals(relationship)) {
                final String waitMode = context.getProperty(WAIT_MODE).getValue();

                if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) {
                    // Transfer to self.
                    relationship = Relationship.SELF;
                }
            }
            final Relationship finalRelationship = relationship;
            final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
                    .map(f -> {
                        if (REL_SUCCESS.equals(finalRelationship) || REL_EXPIRED.equals(finalRelationship)) {
                            // These flowFiles will be exiting the wait, clear the timer
                            f = clearWaitState(session, f);
                        }
                        return copySignalAttributes(session, f, signalRef.get(),
                            originalSignalCounts,
                            replaceOriginalAttributes);
                    })
                    .collect(Collectors.toList());

            session.transfer(flowFilesWithSignalAttributes, relationship);
        };

        failedFilteringFlowFiles.forEach(f -> {
            flowFiles.remove(f);
            transferToFailure.accept(f);
        });

        if (flowFiles.isEmpty()) {
            // If there was nothing but failed FlowFiles while filtering, transfer those and end immediately.
            processedFlowFiles.entrySet().forEach(transferFlowFiles);
            return;
        }

        // the cache client used to interact with the distributed cache
        final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
        final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);

        final String signalId = targetSignalId.get();
        final Signal signal;

        // get notifying signal
        try {
            signal = protocol.getSignal(signalId);
            if (signal != null) {
                originalSignalCounts.putAll(signal.getCounts());
            }
            signalRef.set(signal);
        } catch (final IOException e) {
            throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e);
        }

        String targetCounterName = null;
        long targetCount = 1;
        int releasableFlowFileCount = 1;

        final List<FlowFile> candidates = new ArrayList<>();

        for (FlowFile flowFile : flowFiles) {
            // Set wait start timestamp if it's not set yet
            String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP);
            if (waitStartTimestamp == null) {
                waitStartTimestamp = String.valueOf(System.currentTimeMillis());
                flowFile = session.putAttribute(flowFile, WAIT_START_TIMESTAMP, waitStartTimestamp);
            }

            long lWaitStartTimestamp;
            try {
                lWaitStartTimestamp = Long.parseLong(waitStartTimestamp);
            } catch (NumberFormatException nfe) {
                logger.error("{} has an invalid value '{}' on FlowFile {}", WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile);
                transferToFailure.accept(flowFile);
                continue;
            }

            // check for expiration
            long expirationDuration = context.getProperty(EXPIRATION_DURATION)
                    .asTimePeriod(TimeUnit.MILLISECONDS);
            long now = System.currentTimeMillis();
            if (now > (lWaitStartTimestamp + expirationDuration)) {
                logger.info("FlowFile {} expired after {}ms", flowFile, (now - lWaitStartTimestamp));
                getFlowFilesFor.apply(REL_EXPIRED).add(flowFile);
                continue;
            }

            // If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
            if (signal == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("No release signal found for {} on FlowFile {} yet", signalId, flowFile);
                }
                getFlowFilesFor.apply(REL_WAIT).add(flowFile);
                continue;
            }

            // Fix target counter name and count from current FlowFile, if those are not set yet.
            if (candidates.isEmpty()) {
                targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
                try {
                    targetCount = Long.parseLong(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
                } catch (final NumberFormatException e) {
                    transferToFailure.accept(flowFile);
                    logger.error("Failed to parse targetCount when processing {} due to {}", flowFile, e, e);
                    continue;
                }
                try {
                    releasableFlowFileCount = Integer.parseInt(context.getProperty(RELEASABLE_FLOWFILE_COUNT).evaluateAttributeExpressions(flowFile).getValue());
                } catch (final NumberFormatException e) {
                    transferToFailure.accept(flowFile);
                    logger.error("Failed to parse releasableFlowFileCount when processing {} due to {}", flowFile, e, e);
                    continue;
                }
            }

            // FlowFile is now validated and added to candidates.
            candidates.add(flowFile);
        }

        boolean waitCompleted = false;
        boolean waitProgressed = false;
        if (signal != null && !candidates.isEmpty()) {

            if (releasableFlowFileCount > 0) {
                signal.releaseCandidates(targetCounterName, targetCount, releasableFlowFileCount, candidates,
                        released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released),
                        waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting));
                waitCompleted = signal.getTotalCount() == 0 && signal.getReleasableCount() == 0;
                waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();

            } else {
                boolean reachedTargetCount = StringUtils.isBlank(targetCounterName)
                        ? signal.isTotalCountReached(targetCount)
                        : signal.isCountReached(targetCounterName, targetCount);

                if (reachedTargetCount) {
                    getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
                } else {
                    getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
                }
            }
        }

        // Transfer FlowFiles.
        processedFlowFiles.entrySet().forEach(transferFlowFiles);

        // Penalize signal id if no FlowFile transferred to success.
        final PropertyValue waitPenaltyDuration = context.getProperty(WAIT_PENALTY_DURATION);
        if (waitPenaltyDuration.isSet() && getFlowFilesFor.apply(REL_SUCCESS).isEmpty()) {
            signalIdPenalties.put(signalId, System.currentTimeMillis() + waitPenaltyDuration.asTimePeriod(TimeUnit.MILLISECONDS));
        }

        // Update signal if needed.
        try {
            if (waitCompleted) {
                protocol.complete(signalId);
            } else if (waitProgressed) {
                protocol.replace(signal);
            }

        } catch (final IOException e) {
            session.rollback();
            throw new ProcessException(String.format("Unable to communicate with cache while updating %s due to %s", signalId, e), e);
        }

    }