libminifi/src/provenance/Provenance.cpp (518 lines of code) (raw):
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "provenance/Provenance.h"
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include <list>
#include <utility>
#include "core/Repository.h"
#include "io/BufferStream.h"
#include "core/logging/Logger.h"
#include "core/Relationship.h"
#include "FlowController.h"
#include "utils/gsl.h"
namespace org::apache::nifi::minifi::provenance {
std::shared_ptr<utils::IdGenerator> ProvenanceEventRecord::id_generator_ = utils::IdGenerator::getIdGenerator();
std::shared_ptr<core::logging::Logger> ProvenanceEventRecord::logger_ = core::logging::LoggerFactory<ProvenanceEventRecord>::getLogger();
const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY + 1] = { "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK", "JOIN", "CLONE", "CONTENT_MODIFIED",
"ATTRIBUTES_MODIFIED", "ROUTE", "ADDINFO", "REPLAY" };
ProvenanceEventRecord::ProvenanceEventRecord(ProvenanceEventRecord::ProvenanceEventType event, std::string componentId, std::string componentType)
: core::SerializableComponent(core::className<ProvenanceEventRecord>()),
_eventType(event),
_componentId(std::move(componentId)),
_componentType(std::move(componentType)),
_eventTime(std::chrono::system_clock::now()) {
}
bool ProvenanceEventRecord::loadFromRepository(const std::shared_ptr<core::Repository> &repo) {
std::string value;
bool ret;
if (nullptr == repo || uuid_.isNil()) {
logger_->log_error("Repo could not be assigned");
return false;
}
ret = repo->Get(getUUIDStr(), value);
if (!ret) {
logger_->log_error("NiFi Provenance Store event %s can not be found", getUUIDStr());
return false;
} else {
logger_->log_debug("NiFi Provenance Read event %s", getUUIDStr());
}
org::apache::nifi::minifi::io::BufferStream stream(value);
ret = deserialize(stream);
if (ret) {
logger_->log_debug("NiFi Provenance retrieve event %s size %llu eventType %d success", getUUIDStr(), stream.size(), _eventType);
} else {
logger_->log_debug("NiFi Provenance retrieve event %s size %llu eventType %d fail", getUUIDStr(), stream.size(), _eventType);
}
return ret;
}
bool ProvenanceEventRecord::serialize(io::OutputStream& output_stream) {
{
const auto ret = output_stream.write(this->uuid_);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
uint32_t eventType = this->_eventType;
const auto ret = output_stream.write(eventType);
if (ret != 4) {
return false;
}
}
{
uint64_t event_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(_eventTime.time_since_epoch()).count();
const auto ret = output_stream.write(event_time_ms);
if (ret != 8) {
return false;
}
}
{
uint64_t entry_date_ms = std::chrono::duration_cast<std::chrono::milliseconds>(_entryDate.time_since_epoch()).count();
const auto ret = output_stream.write(entry_date_ms);
if (ret != 8) {
return false;
}
}
{
uint64_t event_duration_ms = this->_eventDuration.count();
const auto ret = output_stream.write(event_duration_ms);
if (ret != 8) {
return false;
}
}
{
uint64_t lineage_start_date_ms = std::chrono::duration_cast<std::chrono::milliseconds>(_lineageStartDate.time_since_epoch()).count();
const auto ret = output_stream.write(lineage_start_date_ms);
if (ret != 8) {
return false;
}
}
{
const auto ret = output_stream.write(this->_componentId);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = output_stream.write(this->_componentType);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = output_stream.write(this->flow_uuid_);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = output_stream.write(this->_details);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
// write flow attributes
{
const auto numAttributes = gsl::narrow<uint32_t>(this->_attributes.size());
const auto ret = output_stream.write(numAttributes);
if (ret != 4) {
return false;
}
}
for (const auto& itAttribute : _attributes) {
{
const auto ret = output_stream.write(itAttribute.first);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = output_stream.write(itAttribute.second);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
}
{
const auto ret = output_stream.write(this->_contentFullPath);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = output_stream.write(this->_size);
if (ret != 8) {
return false;
}
}
{
const auto ret = output_stream.write(this->_offset);
if (ret != 8) {
return false;
}
}
{
const auto ret = output_stream.write(this->_sourceQueueIdentifier);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) {
// write UUIDs
{
const auto parent_uuids_count = gsl::narrow<uint32_t>(this->_parentUuids.size());
const auto ret = output_stream.write(parent_uuids_count);
if (ret != 4) {
return false;
}
}
for (const auto& parentUUID : _parentUuids) {
const auto ret = output_stream.write(parentUUID);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto children_uuids_count = gsl::narrow<uint32_t>(this->_childrenUuids.size());
const auto ret = output_stream.write(children_uuids_count);
if (ret != 4) {
return false;
}
}
for (const auto& childUUID : _childrenUuids) {
const auto ret = output_stream.write(childUUID);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
} else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
const auto ret = output_stream.write(this->_transitUri);
if (ret == 0 || io::isError(ret)) {
return false;
}
} else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
{
const auto ret = output_stream.write(this->_transitUri);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = output_stream.write(this->_sourceSystemFlowFileIdentifier);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
}
return true;
}
bool ProvenanceEventRecord::deserialize(io::InputStream &input_stream) {
{
const auto ret = input_stream.read(uuid_);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
uint32_t eventType;
{
const auto ret = input_stream.read(eventType);
if (ret != 4) {
return false;
}
}
this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
{
uint64_t event_time_in_ms;
const auto ret = input_stream.read(event_time_in_ms);
if (ret != 8) {
return false;
}
_eventTime = std::chrono::system_clock::time_point() + std::chrono::milliseconds(event_time_in_ms);
}
{
uint64_t entry_date_in_ms;
const auto ret = input_stream.read(entry_date_in_ms);
if (ret != 8) {
return false;
}
_entryDate = std::chrono::system_clock::time_point() + std::chrono::milliseconds(entry_date_in_ms);
}
{
uint64_t event_duration_ms;
const auto ret = input_stream.read(event_duration_ms);
if (ret != 8) {
return false;
}
_eventDuration = std::chrono::milliseconds(event_duration_ms);
}
{
uint64_t lineage_start_date_in_ms;
const auto ret = input_stream.read(lineage_start_date_in_ms);
if (ret != 8) {
return false;
}
_lineageStartDate = std::chrono::system_clock::time_point() + std::chrono::milliseconds(lineage_start_date_in_ms);
}
{
const auto ret = input_stream.read(this->_componentId);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = input_stream.read(this->_componentType);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = input_stream.read(this->flow_uuid_);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = input_stream.read(this->_details);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
// read flow attributes
uint32_t numAttributes = 0;
{
const auto ret = input_stream.read(numAttributes);
if (ret != 4) {
return false;
}
}
for (uint32_t i = 0; i < numAttributes; i++) {
std::string key;
{
const auto ret = input_stream.read(key);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
std::string value;
{
const auto ret = input_stream.read(value);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
this->_attributes[key] = value;
}
{
const auto ret = input_stream.read(this->_contentFullPath);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = input_stream.read(this->_size);
if (ret != 8) {
return false;
}
}
{
const auto ret = input_stream.read(this->_offset);
if (ret != 8) {
return false;
}
}
{
const auto ret = input_stream.read(this->_sourceQueueIdentifier);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) {
// read UUIDs
uint32_t number = 0;
{
const auto ret = input_stream.read(number);
if (ret != 4) {
return false;
}
}
for (uint32_t i = 0; i < number; i++) {
utils::Identifier parentUUID;
{
const auto ret = input_stream.read(parentUUID);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
this->addParentUuid(parentUUID);
}
number = 0;
{
const auto ret = input_stream.read(number);
if (ret != 4) {
return false;
}
}
for (uint32_t i = 0; i < number; i++) {
utils::Identifier childUUID;
{
const auto ret = input_stream.read(childUUID);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
this->addChildUuid(childUUID);
}
} else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
{
const auto ret = input_stream.read(this->_transitUri);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
} else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
{
const auto ret = input_stream.read(this->_transitUri);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
{
const auto ret = input_stream.read(this->_sourceSystemFlowFileIdentifier);
if (ret == 0 || io::isError(ret)) {
return false;
}
}
}
return true;
}
void ProvenanceReporter::commit() {
if (repo_->isNoop()) {
return;
}
if (repo_->isFull()) {
logger_->log_debug("Provenance Repository is full");
return;
}
std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>> flowData;
for (auto& event : _events) {
std::unique_ptr<io::BufferStream> stramptr(new io::BufferStream());
event->serialize(*stramptr);
flowData.emplace_back(event->getUUIDStr(), std::move(stramptr));
}
repo_->MultiPut(flowData);
}
void ProvenanceReporter::create(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail) {
auto event = allocate(ProvenanceEventRecord::CREATE, flow);
if (event) {
event->setDetails(detail);
add(event);
}
}
void ProvenanceReporter::route(const std::shared_ptr<core::FlowFile>& flow, const core::Relationship& relation, const std::string& detail, std::chrono::milliseconds processingDuration) {
auto event = allocate(ProvenanceEventRecord::ROUTE, flow);
if (event) {
event->setDetails(detail);
event->setRelationship(relation.getName());
event->setEventDuration(processingDuration);
add(event);
}
}
void ProvenanceReporter::modifyAttributes(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail) {
auto event = allocate(ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow);
if (event) {
event->setDetails(detail);
add(event);
}
}
void ProvenanceReporter::modifyContent(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail, std::chrono::milliseconds processingDuration) {
auto event = allocate(ProvenanceEventRecord::CONTENT_MODIFIED, flow);
if (event) {
event->setDetails(detail);
event->setEventDuration(processingDuration);
add(event);
}
}
void ProvenanceReporter::clone(const std::shared_ptr<core::FlowFile>& parent, const std::shared_ptr<core::FlowFile>& child) {
auto event = allocate(ProvenanceEventRecord::CLONE, parent);
if (event) {
event->addChildFlowFile(child);
event->addParentFlowFile(parent);
add(event);
}
}
void ProvenanceReporter::join(const std::vector<std::shared_ptr<core::FlowFile>>& parents, const std::shared_ptr<core::FlowFile>& child,
const std::string& detail, std::chrono::milliseconds processingDuration) {
auto event = allocate(ProvenanceEventRecord::JOIN, child);
if (event) {
event->addChildFlowFile(child);
for (const auto& parent : parents) {
event->addParentFlowFile(parent);
}
event->setDetails(detail);
event->setEventDuration(processingDuration);
add(event);
}
}
void ProvenanceReporter::fork(const std::vector<std::shared_ptr<core::FlowFile>>& children, const std::shared_ptr<core::FlowFile>& parent,
const std::string& detail, std::chrono::milliseconds processingDuration) {
auto event = allocate(ProvenanceEventRecord::FORK, parent);
if (event) {
event->addParentFlowFile(parent);
for (const auto& child : children) {
event->addChildFlowFile(child);
}
event->setDetails(detail);
event->setEventDuration(processingDuration);
add(event);
}
}
void ProvenanceReporter::expire(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail) {
auto event = allocate(ProvenanceEventRecord::EXPIRE, flow);
if (event) {
event->setDetails(detail);
add(event);
}
}
void ProvenanceReporter::drop(const std::shared_ptr<core::FlowFile>& flow, const std::string& reason) {
auto event = allocate(ProvenanceEventRecord::DROP, flow);
if (event) {
std::string dropReason = "Discard reason: " + reason;
event->setDetails(dropReason);
add(event);
}
}
void ProvenanceReporter::send(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration, bool force) {
auto event = allocate(ProvenanceEventRecord::SEND, flow);
if (event) {
event->setTransitUri(transitUri);
event->setDetails(detail);
event->setEventDuration(processingDuration);
if (!force) {
add(event);
} else {
if (!repo_->isFull())
repo_->storeElement(event);
}
}
}
void ProvenanceReporter::receive(const std::shared_ptr<core::FlowFile>& flow,
const std::string& transitUri,
const std::string& sourceSystemFlowFileIdentifier,
const std::string& detail,
std::chrono::milliseconds processingDuration) {
auto event = allocate(ProvenanceEventRecord::RECEIVE, flow);
if (event) {
event->setTransitUri(transitUri);
event->setDetails(detail);
event->setEventDuration(processingDuration);
event->setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier);
add(event);
}
}
void ProvenanceReporter::fetch(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration) {
auto event = allocate(ProvenanceEventRecord::FETCH, flow);
if (event) {
event->setTransitUri(transitUri);
event->setDetails(detail);
event->setEventDuration(processingDuration);
add(event);
}
}
} // namespace org::apache::nifi::minifi::provenance