libminifi/include/provenance/Provenance.h (276 lines of code) (raw):
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <thread>
#include <vector>
#include "core/Core.h"
#include "core/SerializableComponent.h"
#include "core/Repository.h"
#include "core/Property.h"
#include "properties/Configure.h"
#include "Connection.h"
#include "FlowFileRecord.h"
#include "core/logging/LoggerFactory.h"
#include "ResourceClaim.h"
#include "utils/gsl.h"
#include "utils/Id.h"
#include "utils/TimeUtil.h"
namespace org::apache::nifi::minifi::provenance {
class ProvenanceEventRecord : public core::SerializableComponent {
public:
enum ProvenanceEventType {
/**
* A CREATE event is used when a FlowFile is generated from data that was
* not received from a remote system or external process
*/
CREATE,
/**
* Indicates a provenance event for receiving data from an external process. This Event Type
* is expected to be the first event for a FlowFile. As such, a Processor that receives data
* from an external source and uses that data to replace the content of an existing FlowFile
* should use the {@link #FETCH} event type, rather than the RECEIVE event type.
*/
RECEIVE,
/**
* Indicates that the contents of a FlowFile were overwritten using the contents of some
* external resource. This is similar to the {@link #RECEIVE} event but varies in that
* RECEIVE events are intended to be used as the event that introduces the FlowFile into
* the system, whereas FETCH is used to indicate that the contents of an existing FlowFile
* were overwritten.
*/
FETCH,
/**
* Indicates a provenance event for sending data to an external process
*/
SEND,
/**
* Indicates that the contents of a FlowFile were downloaded by a user or external entity.
*/
DOWNLOAD,
/**
* Indicates a provenance event for the conclusion of an object's life for
* some reason other than object expiration
*/
DROP,
/**
* Indicates a provenance event for the conclusion of an object's life due
* to the fact that the object could not be processed in a timely manner
*/
EXPIRE,
/**
* FORK is used to indicate that one or more FlowFile was derived from a
* parent FlowFile.
*/
FORK,
/**
* JOIN is used to indicate that a single FlowFile is derived from joining
* together multiple parent FlowFiles.
*/
JOIN,
/**
* CLONE is used to indicate that a FlowFile is an exact duplicate of its
* parent FlowFile.
*/
CLONE,
/**
* CONTENT_MODIFIED is used to indicate that a FlowFile's content was
* modified in some way. When using this Event Type, it is advisable to
* provide details about how the content is modified.
*/
CONTENT_MODIFIED,
/**
* ATTRIBUTES_MODIFIED is used to indicate that a FlowFile's attributes were
* modified in some way. This event is not needed when another event is
* reported at the same time, as the other event will already contain all
* FlowFile attributes.
*/
ATTRIBUTES_MODIFIED,
/**
* ROUTE is used to show that a FlowFile was routed to a specified
* {@link org.apache.nifi.processor.Relationship Relationship} and should provide
* information about why the FlowFile was routed to this relationship.
*/
ROUTE,
/**
* Indicates a provenance event for adding additional information such as a
* new linkage to a new URI or UUID
*/
ADDINFO,
/**
* Indicates a provenance event for replaying a FlowFile. The UUID of the
* event will indicate the UUID of the original FlowFile that is being
* replayed. The event will contain exactly one Parent UUID that is also the
* UUID of the FlowFile that is being replayed and exactly one Child UUID
* that is the UUID of the a newly created FlowFile that will be re-queued
* for processing.
*/
REPLAY
};
static const char *ProvenanceEventTypeStr[REPLAY + 1];
ProvenanceEventRecord(ProvenanceEventType event, std::string componentId, std::string componentType);
ProvenanceEventRecord()
: core::SerializableComponent(core::className<ProvenanceEventRecord>()) {
_eventTime = std::chrono::system_clock::now();
}
virtual ~ProvenanceEventRecord() = default;
utils::Identifier getEventId() const {
return getUUID();
}
void setEventId(const utils::Identifier &id) {
setUUID(id);
}
std::map<std::string, std::string> getAttributes() const {
return _attributes;
}
uint64_t getFileSize() const {
return _size;
}
uint64_t getFileOffset() const {
return _offset;
}
std::chrono::system_clock::time_point getFlowFileEntryDate() const {
return _entryDate;
}
std::chrono::system_clock::time_point getlineageStartDate() const {
return _lineageStartDate;
}
std::chrono::system_clock::time_point getEventTime() const {
return _eventTime;
}
std::chrono::milliseconds getEventDuration() const {
return _eventDuration;
}
void setEventDuration(std::chrono::milliseconds duration) {
_eventDuration = duration;
}
ProvenanceEventType getEventType() const {
return _eventType;
}
std::string getComponentId() const {
return _componentId;
}
std::string getComponentType() const {
return _componentType;
}
utils::Identifier getFlowFileUuid() const {
return flow_uuid_;
}
std::string getContentFullPath() const {
return _contentFullPath;
}
std::vector<utils::Identifier> getLineageIdentifiers() const {
return _lineageIdentifiers;
}
std::string getDetails() const {
return _details;
}
void setDetails(const std::string& details) {
_details = details;
}
std::string getTransitUri() {
return _transitUri;
}
void setTransitUri(const std::string& uri) {
_transitUri = uri;
}
std::string getSourceSystemFlowFileIdentifier() const {
return _sourceSystemFlowFileIdentifier;
}
void setSourceSystemFlowFileIdentifier(const std::string& identifier) {
_sourceSystemFlowFileIdentifier = identifier;
}
std::vector<utils::Identifier> getParentUuids() const {
return _parentUuids;
}
void addParentUuid(const utils::Identifier& uuid) {
if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) != _parentUuids.end())
return;
else
_parentUuids.push_back(uuid);
}
void addParentFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
addParentUuid(flow->getUUID());
}
void removeParentUuid(const utils::Identifier& uuid) {
_parentUuids.erase(std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), _parentUuids.end());
}
void removeParentFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
removeParentUuid(flow->getUUID());
}
std::vector<utils::Identifier> getChildrenUuids() const {
return _childrenUuids;
}
void addChildUuid(const utils::Identifier& uuid) {
if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) != _childrenUuids.end())
return;
else
_childrenUuids.push_back(uuid);
}
void addChildFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
addChildUuid(flow->getUUID());
return;
}
void removeChildUuid(const utils::Identifier& uuid) {
_childrenUuids.erase(std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), _childrenUuids.end());
}
void removeChildFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
removeChildUuid(flow->getUUID());
}
std::string getAlternateIdentifierUri() const {
return _alternateIdentifierUri;
}
void setAlternateIdentifierUri(const std::string& uri) {
_alternateIdentifierUri = uri;
}
std::string getRelationship() const {
return _relationship;
}
void setRelationship(const std::string& relation) {
_relationship = relation;
}
std::string getSourceQueueIdentifier() const {
return _sourceQueueIdentifier;
}
void setSourceQueueIdentifier(const std::string& identifier) {
_sourceQueueIdentifier = identifier;
}
void fromFlowFile(const std::shared_ptr<core::FlowFile> &flow) {
_entryDate = flow->getEntryDate();
_lineageStartDate = flow->getlineageStartDate();
_lineageIdentifiers = flow->getlineageIdentifiers();
flow_uuid_ = flow->getUUID();
_attributes = flow->getAttributes();
_size = flow->getSize();
_offset = flow->getOffset();
if (flow->getConnection())
_sourceQueueIdentifier = flow->getConnection()->getName();
if (flow->getResourceClaim()) {
_contentFullPath = flow->getResourceClaim()->getContentFullPath();
}
}
bool serialize(io::OutputStream& output_stream) override;
bool deserialize(io::InputStream &input_stream) override;
bool loadFromRepository(const std::shared_ptr<core::Repository> &repo);
protected:
ProvenanceEventType _eventType;
// Date at which the event was created
std::chrono::system_clock::time_point _eventTime{};
// Date at which the flow file entered the flow
std::chrono::system_clock::time_point _entryDate{};
// Date at which the origin of this flow file entered the flow
std::chrono::system_clock::time_point _lineageStartDate{};
std::chrono::milliseconds _eventDuration{};
std::string _componentId;
std::string _componentType;
// Size in bytes of the data corresponding to this flow file
uint64_t _size;
utils::Identifier flow_uuid_;
uint64_t _offset;
std::string _contentFullPath;
std::map<std::string, std::string> _attributes;
// UUID string for all parents
std::vector<utils::Identifier> _lineageIdentifiers;
std::string _transitUri;
std::string _sourceSystemFlowFileIdentifier;
std::vector<utils::Identifier> _parentUuids;
std::vector<utils::Identifier> _childrenUuids;
std::string _details;
std::string _sourceQueueIdentifier;
std::string _relationship;
std::string _alternateIdentifierUri;
private:
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProvenanceEventRecord(const ProvenanceEventRecord &parent);
ProvenanceEventRecord &operator=(const ProvenanceEventRecord &parent);
static std::shared_ptr<core::logging::Logger> logger_;
static std::shared_ptr<utils::IdGenerator> id_generator_;
};
class ProvenanceReporter {
public:
ProvenanceReporter(std::shared_ptr<core::Repository> repo, std::string componentId, std::string componentType)
: logger_(core::logging::LoggerFactory<ProvenanceReporter>::getLogger()) {
_componentId = componentId;
_componentType = componentType;
repo_ = repo;
}
virtual ~ProvenanceReporter() {
clear();
}
std::set<std::shared_ptr<ProvenanceEventRecord>> getEvents() const {
return _events;
}
void add(const std::shared_ptr<ProvenanceEventRecord> &event) {
_events.insert(event);
}
void remove(const std::shared_ptr<ProvenanceEventRecord> &event) {
if (_events.find(event) != _events.end()) {
_events.erase(event);
}
}
void clear() {
_events.clear();
}
void commit();
void create(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail);
void route(const std::shared_ptr<core::FlowFile>& flow, const core::Relationship& relation, const std::string& detail, std::chrono::milliseconds processingDuration);
void modifyAttributes(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail);
void modifyContent(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail, std::chrono::milliseconds processingDuration);
void clone(const std::shared_ptr<core::FlowFile>& parent, const std::shared_ptr<core::FlowFile>& child);
void join(const std::vector<std::shared_ptr<core::FlowFile>>& parents, const std::shared_ptr<core::FlowFile>& child, const std::string& detail, std::chrono::milliseconds processingDuration);
void fork(const std::vector<std::shared_ptr<core::FlowFile>>& children, const std::shared_ptr<core::FlowFile>& parent, const std::string& detail, std::chrono::milliseconds processingDuration);
void expire(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail);
void drop(const std::shared_ptr<core::FlowFile>& flow, const std::string& reason);
void send(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration, bool force);
void fetch(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration);
void receive(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri,
const std::string& sourceSystemFlowFileIdentifier, const std::string& detail, std::chrono::milliseconds processingDuration);
protected:
std::shared_ptr<ProvenanceEventRecord> allocate(ProvenanceEventRecord::ProvenanceEventType eventType, const std::shared_ptr<core::FlowFile>& flow) {
if (repo_->isNoop()) {
return nullptr;
}
auto event = std::make_shared<ProvenanceEventRecord>(eventType, _componentId, _componentType);
if (event)
event->fromFlowFile(flow);
return event;
}
std::string _componentId;
std::string _componentType;
private:
std::shared_ptr<core::logging::Logger> logger_;
std::set<std::shared_ptr<ProvenanceEventRecord>> _events;
std::shared_ptr<core::Repository> repo_;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProvenanceReporter(const ProvenanceReporter &parent);
ProvenanceReporter &operator=(const ProvenanceReporter &parent);
};
} // namespace org::apache::nifi::minifi::provenance