protected void updateProvenanceRepo()

in nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java [823:1029]


    protected void updateProvenanceRepo(final Checkpoint checkpoint) {
        // Update Provenance Repository
        final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository();

        // We need to de-dupe the events that we've created and those reported to the provenance reporter,
        // in case the Processor developer submitted the same events to the reporter. So we use a LinkedHashSet
        // for this, so that we are able to ensure that the events are submitted in the proper order.
        final Set<ProvenanceEventRecord> recordsToSubmit = new LinkedHashSet<>();
        final Map<String, BitSet> eventTypesPerFlowFileId = new HashMap<>();

        final Set<ProvenanceEventRecord> processorGenerated = checkpoint.reportedEvents;

        // We first want to submit FORK events because if the Processor is going to create events against
        // a FlowFile, that FlowFile needs to be shown to be created first.
        // However, if the Processor has generated a FORK event, we don't want to use the Framework-created one --
        // we prefer to use the event generated by the Processor. We can determine this by checking if the Set of events genereated
        // by the Processor contains any of the FORK events that we generated
        for (final Map.Entry<FlowFile, ProvenanceEventBuilder> entry : checkpoint.forkEventBuilders.entrySet()) {
            final ProvenanceEventBuilder builder = entry.getValue();
            final FlowFile flowFile = entry.getKey();

            updateEventContentClaims(builder, flowFile, checkpoint.getRecord(flowFile));
            final ProvenanceEventRecord event = builder.build();

            if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
                // If framework generated the event, add it to the 'recordsToSubmit' Set.
                if (!processorGenerated.contains(event)) {
                    recordsToSubmit.add(event);
                }

                // Register the FORK event for each child and each parent.
                for (final String childUuid : event.getChildUuids()) {
                    addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType());
                }
                for (final String parentUuid : event.getParentUuids()) {
                    addEventType(eventTypesPerFlowFileId, parentUuid, event.getEventType());
                }
            }
        }

        // Next, process any JOIN events because we need to ensure that the JOINed FlowFile is created before any processor-emitted events occur.
        for (final Map.Entry<FlowFile, List<ProvenanceEventRecord>> entry : checkpoint.generatedProvenanceEvents.entrySet()) {
            for (final ProvenanceEventRecord event : entry.getValue()) {
                final ProvenanceEventType eventType = event.getEventType();
                if (eventType == ProvenanceEventType.JOIN) {
                    recordsToSubmit.add(event);
                    addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
                }
            }
        }

        // Now add any Processor-reported events.
        for (final ProvenanceEventRecord event : processorGenerated) {
            if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
                continue;
            }

            // Check if the event indicates that the FlowFile was routed to the same
            // connection from which it was pulled (and only this connection). If so, discard the event.
            if (isSpuriousRouteEvent(event, checkpoint.records)) {
                continue;
            }

            recordsToSubmit.add(event);
            addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());

            final List<String> childUuids = event.getChildUuids();
            if (childUuids != null) {
                for (final String childUuid : childUuids) {
                    addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType());
                }
            }
        }

        // Finally, add any other events that we may have generated.
        for (final List<ProvenanceEventRecord> eventList : checkpoint.generatedProvenanceEvents.values()) {
            for (final ProvenanceEventRecord event : eventList) {
                if (event.getEventType() == ProvenanceEventType.JOIN) {
                    continue; // JOIN events are handled above.
                }

                if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
                    continue;
                }

                recordsToSubmit.add(event);
                addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
            }
        }

        // Check if content or attributes changed. If so, register the appropriate events.
        for (final StandardRepositoryRecord repoRecord : checkpoint.records.values()) {
            final ContentClaim original = repoRecord.getOriginalClaim();
            final ContentClaim current = repoRecord.getCurrentClaim();

            final boolean contentChanged = !Objects.equals(original, current);
            final FlowFileRecord curFlowFile = repoRecord.getCurrent();
            final String flowFileId = curFlowFile.getAttribute(CoreAttributes.UUID.key());
            boolean eventAdded = false;

            if (checkpoint.removedFlowFiles.contains(flowFileId)) {
                continue;
            }

            final boolean newFlowFile = repoRecord.getOriginal() == null;
            if (contentChanged && !newFlowFile) {
                recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build());
                addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED);
                eventAdded = true;
            }

            if (checkpoint.createdFlowFiles.contains(flowFileId)) {
                final BitSet registeredTypes = eventTypesPerFlowFileId.get(flowFileId);
                boolean creationEventRegistered = false;
                if (registeredTypes != null) {
                    if (registeredTypes.get(ProvenanceEventType.CREATE.ordinal())
                        || registeredTypes.get(ProvenanceEventType.FORK.ordinal())
                        || registeredTypes.get(ProvenanceEventType.CLONE.ordinal())
                        || registeredTypes.get(ProvenanceEventType.JOIN.ordinal())
                        || registeredTypes.get(ProvenanceEventType.RECEIVE.ordinal())
                        || registeredTypes.get(ProvenanceEventType.FETCH.ordinal())) {

                        creationEventRegistered = true;
                    }
                }

                if (!creationEventRegistered) {
                    recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CREATE).build());
                    eventAdded = true;
                }
            }

            if (!eventAdded && !repoRecord.getUpdatedAttributes().isEmpty() && curFlowFile.getAttribute(retryAttribute) == null) {
                // We generate an ATTRIBUTES_MODIFIED event only if no other event has been
                // created for the FlowFile. We do this because all events contain both the
                // newest and the original attributes, so generating an ATTRIBUTES_MODIFIED
                // event is redundant if another already exists.
                // We don't generate ATTRIBUTES_MODIFIED event for retry.
                if (!eventTypesPerFlowFileId.containsKey(flowFileId)) {
                    recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build());
                    addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED);
                }
            }
        }

        // We want to submit the 'recordsToSubmit' collection, followed by the auto-terminated events to the Provenance Repository.
        // We want to do this with a single call to ProvenanceEventRepository#registerEvents because it may be much more efficient
        // to do so.
        // However, we want to modify the events in 'recordsToSubmit' to obtain the data from the most recent version of the FlowFiles
        // (except for SEND events); see note below as to why this is
        // Therefore, we create an Iterable that can iterate over each of these events, modifying them as needed, and returning them
        // in the appropriate order. This prevents an unnecessary step of creating an intermediate List and adding all of those values
        // to the List.
        // This is done in a similar veign to how Java 8's streams work, iterating over the events and returning a processed version
        // one-at-a-time as opposed to iterating over the entire Collection and putting the results in another Collection. However,
        // we don't want to change the Framework to require Java 8 at this time, because it's not yet as prevalent as we would desire
        final Map<String, FlowFileRecord> flowFileRecordMap = new HashMap<>();
        for (final StandardRepositoryRecord repoRecord : checkpoint.records.values()) {
            final FlowFileRecord flowFile = repoRecord.getCurrent();
            flowFileRecordMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile);
        }

        final long commitNanos = System.nanoTime();
        final List<ProvenanceEventRecord> autoTermEvents = checkpoint.autoTerminatedEvents;
        final Iterable<ProvenanceEventRecord> iterable = new Iterable<>() {
            final Iterator<ProvenanceEventRecord> recordsToSubmitIterator = recordsToSubmit.iterator();
            final Iterator<ProvenanceEventRecord> autoTermIterator = autoTermEvents == null ? null : autoTermEvents.iterator();

            @Override
            public Iterator<ProvenanceEventRecord> iterator() {
                return new Iterator<>() {
                    @Override
                    public boolean hasNext() {
                        return recordsToSubmitIterator.hasNext() || autoTermIterator != null && autoTermIterator.hasNext();
                    }

                    @Override
                    public ProvenanceEventRecord next() {
                        if (recordsToSubmitIterator.hasNext()) {
                            final ProvenanceEventRecord rawEvent = recordsToSubmitIterator.next();

                            // Update the Provenance Event Record with all of the info that we know about the event.
                            // For SEND events, we do not want to update the FlowFile info on the Event, because the event should
                            // reflect the FlowFile as it was sent to the remote system. However, for other events, we want to use
                            // the representation of the FlowFile as it is committed, as this is the only way in which it really
                            // exists in our system -- all other representations are volatile representations that have not been
                            // exposed.
                            final boolean isUpdateAttributesAndContent = rawEvent.getEventType() != ProvenanceEventType.SEND && rawEvent.getEventType() != ProvenanceEventType.UPLOAD
                                    && rawEvent.getEventType() != ProvenanceEventType.CLONE;
                            return enrich(rawEvent, flowFileRecordMap, checkpoint.records, isUpdateAttributesAndContent, commitNanos);
                        } else if (autoTermIterator != null && autoTermIterator.hasNext()) {
                            return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true, commitNanos);
                        }

                        throw new NoSuchElementException();
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };

        provenanceRepo.registerEvents(iterable);
    }