libminifi/include/provenance/Provenance.h (258 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <algorithm> #include <atomic> #include <cstdint> #include <cstring> #include <iostream> #include <map> #include <memory> #include <set> #include <string> #include <thread> #include <vector> #include "core/Core.h" #include "core/SerializableComponent.h" #include "core/Repository.h" #include "core/Property.h" #include "properties/Configure.h" #include "Connection.h" #include "FlowFileRecord.h" #include "core/logging/LoggerFactory.h" #include "ResourceClaim.h" #include "utils/gsl.h" #include "utils/Id.h" #include "utils/TimeUtil.h" #include "minifi-cpp/provenance/Provenance.h" namespace org::apache::nifi::minifi::provenance { class ProvenanceEventRecordImpl : public core::SerializableComponentImpl, public virtual ProvenanceEventRecord { public: static const char *ProvenanceEventTypeStr[REPLAY + 1]; ProvenanceEventRecordImpl(ProvenanceEventType event, std::string componentId, std::string componentType); ProvenanceEventRecordImpl() : core::SerializableComponentImpl(core::className<ProvenanceEventRecord>()) { _eventTime = std::chrono::system_clock::now(); } ~ProvenanceEventRecordImpl() override = default; utils::Identifier getEventId() const override { return getUUID(); } void setEventId(const utils::Identifier &id) override { setUUID(id); } std::map<std::string, std::string> getAttributes() const override { return _attributes; } uint64_t getFileSize() const override { return _size; } uint64_t getFileOffset() const override { return _offset; } std::chrono::system_clock::time_point getFlowFileEntryDate() const override { return _entryDate; } std::chrono::system_clock::time_point getlineageStartDate() const override { return _lineageStartDate; } std::chrono::system_clock::time_point getEventTime() const override { return _eventTime; } std::chrono::milliseconds getEventDuration() const override { return _eventDuration; } void setEventDuration(std::chrono::milliseconds duration) override { _eventDuration = duration; } ProvenanceEventType getEventType() const override { return _eventType; } std::string getComponentId() const override { return _componentId; } std::string getComponentType() const override { return _componentType; } utils::Identifier getFlowFileUuid() const override { return flow_uuid_; } std::string getContentFullPath() const override { return _contentFullPath; } std::vector<utils::Identifier> getLineageIdentifiers() const override { return _lineageIdentifiers; } std::string getDetails() const override { return _details; } void setDetails(const std::string& details) override { _details = details; } std::string getTransitUri() override { return _transitUri; } void setTransitUri(const std::string& uri) override { _transitUri = uri; } std::string getSourceSystemFlowFileIdentifier() const override { return _sourceSystemFlowFileIdentifier; } void setSourceSystemFlowFileIdentifier(const std::string& identifier) override { _sourceSystemFlowFileIdentifier = identifier; } std::vector<utils::Identifier> getParentUuids() const override { return _parentUuids; } void addParentUuid(const utils::Identifier& uuid) override { if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) != _parentUuids.end()) return; else _parentUuids.push_back(uuid); } void addParentFlowFile(const core::FlowFile& flow_file) override { addParentUuid(flow_file.getUUID()); } void removeParentUuid(const utils::Identifier& uuid) override { _parentUuids.erase(std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), _parentUuids.end()); } void removeParentFlowFile(const core::FlowFile& flow_file) override { removeParentUuid(flow_file.getUUID()); } std::vector<utils::Identifier> getChildrenUuids() const override { return _childrenUuids; } void addChildUuid(const utils::Identifier& uuid) override { if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) != _childrenUuids.end()) return; else _childrenUuids.push_back(uuid); } void addChildFlowFile(const core::FlowFile& flow_file) override { addChildUuid(flow_file.getUUID()); return; } void removeChildUuid(const utils::Identifier& uuid) override { _childrenUuids.erase(std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), _childrenUuids.end()); } void removeChildFlowFile(const core::FlowFile& flow_file) override { removeChildUuid(flow_file.getUUID()); } std::string getAlternateIdentifierUri() const override { return _alternateIdentifierUri; } void setAlternateIdentifierUri(const std::string& uri) override { _alternateIdentifierUri = uri; } std::string getRelationship() const override { return _relationship; } void setRelationship(const std::string& relation) override { _relationship = relation; } std::string getSourceQueueIdentifier() const override { return _sourceQueueIdentifier; } void setSourceQueueIdentifier(const std::string& identifier) override { _sourceQueueIdentifier = identifier; } void fromFlowFile(const core::FlowFile& flow_file) override { _entryDate = flow_file.getEntryDate(); _lineageStartDate = flow_file.getlineageStartDate(); _lineageIdentifiers = flow_file.getlineageIdentifiers(); flow_uuid_ = flow_file.getUUID(); _attributes = flow_file.getAttributes(); _size = flow_file.getSize(); _offset = flow_file.getOffset(); if (flow_file.getConnection()) _sourceQueueIdentifier = flow_file.getConnection()->getName(); if (flow_file.getResourceClaim()) { _contentFullPath = flow_file.getResourceClaim()->getContentFullPath(); } } bool serialize(io::OutputStream& output_stream) override; bool deserialize(io::InputStream &input_stream) override; bool loadFromRepository(const std::shared_ptr<core::Repository> &repo) override; protected: ProvenanceEventType _eventType; // Date at which the event was created std::chrono::system_clock::time_point _eventTime{}; // Date at which the flow file entered the flow std::chrono::system_clock::time_point _entryDate{}; // Date at which the origin of this flow file entered the flow std::chrono::system_clock::time_point _lineageStartDate{}; std::chrono::milliseconds _eventDuration{}; std::string _componentId; std::string _componentType; // Size in bytes of the data corresponding to this flow file uint64_t _size = 0; utils::Identifier flow_uuid_; uint64_t _offset = 0; std::string _contentFullPath; std::map<std::string, std::string> _attributes; // UUID string for all parents std::vector<utils::Identifier> _lineageIdentifiers; std::string _transitUri; std::string _sourceSystemFlowFileIdentifier; std::vector<utils::Identifier> _parentUuids; std::vector<utils::Identifier> _childrenUuids; std::string _details; std::string _sourceQueueIdentifier; std::string _relationship; std::string _alternateIdentifierUri; private: ProvenanceEventRecordImpl(const ProvenanceEventRecordImpl &parent); ProvenanceEventRecordImpl &operator=(const ProvenanceEventRecordImpl &parent); static std::shared_ptr<core::logging::Logger> logger_; static std::shared_ptr<utils::IdGenerator> id_generator_; }; class ProvenanceReporterImpl : public virtual ProvenanceReporter { public: ProvenanceReporterImpl(std::shared_ptr<core::Repository> repo, std::string componentId, std::string componentType) : logger_(core::logging::LoggerFactory<ProvenanceReporter>::getLogger()) { _componentId = componentId; _componentType = componentType; repo_ = repo; } ~ProvenanceReporterImpl() override { clear(); } std::set<std::shared_ptr<ProvenanceEventRecord>> getEvents() const override { return _events; } void add(const std::shared_ptr<ProvenanceEventRecord> &event) override { _events.insert(event); } void remove(const std::shared_ptr<ProvenanceEventRecord> &event) override { if (_events.find(event) != _events.end()) { _events.erase(event); } } void clear() override { _events.clear(); } void commit() override; void create(const core::FlowFile& flow_file, const std::string& detail) override; void route(const core::FlowFile& flow_file, const core::Relationship& relation, const std::string& detail, std::chrono::milliseconds processingDuration) override; void modifyAttributes(const core::FlowFile& flow_file, const std::string& detail) override; void modifyContent(const core::FlowFile& flow_file, const std::string& detail, std::chrono::milliseconds processingDuration) override; void clone(const core::FlowFile& parent, const core::FlowFile& child) override; void expire(const core::FlowFile& flow_file, const std::string& detail) override; void drop(const core::FlowFile& flow_file, const std::string& reason) override; void send(const core::FlowFile& flow_file, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration, bool force) override; void fetch(const core::FlowFile& flow_file, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration) override; void receive(const core::FlowFile& flow_file, const std::string& transitUri, const std::string& sourceSystemFlowFileIdentifier, const std::string& detail, std::chrono::milliseconds processingDuration) override; protected: std::shared_ptr<ProvenanceEventRecord> allocate(ProvenanceEventRecord::ProvenanceEventType eventType, const core::FlowFile& flow_file) { if (repo_->isNoop()) { return nullptr; } auto event = std::make_shared<ProvenanceEventRecordImpl>(eventType, _componentId, _componentType); if (event) event->fromFlowFile(flow_file); return event; } std::string _componentId; std::string _componentType; private: std::shared_ptr<core::logging::Logger> logger_; std::set<std::shared_ptr<ProvenanceEventRecord>> _events; std::shared_ptr<core::Repository> repo_; ProvenanceReporterImpl(const ProvenanceReporterImpl &parent); ProvenanceReporterImpl &operator=(const ProvenanceReporterImpl &parent); }; } // namespace org::apache::nifi::minifi::provenance