extensions/libarchive/MergeContent.h (332 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <deque>
#include <map>
#include <utility>
#include <vector>
#include <memory>
#include <string>
#include "ArchiveCommon.h"
#include "BinFiles.h"
#include "archive_entry.h"
#include "archive.h"
#include "SmartArchivePtrs.h"
#include "core/logging/LoggerFactory.h"
#include "core/PropertyDefinitionBuilder.h"
#include "minifi-cpp/core/PropertyValidator.h"
#include "serialization/FlowFileSerializer.h"
#include "utils/ArrayUtils.h"
#include "utils/gsl.h"
#include "utils/Export.h"
#include "io/Stream.h"
namespace org::apache::nifi::minifi::processors {
namespace merge_content_options {
inline constexpr std::string_view MERGE_STRATEGY_BIN_PACK = "Bin-Packing Algorithm";
inline constexpr std::string_view MERGE_STRATEGY_DEFRAGMENT = "Defragment";
inline constexpr std::string_view MERGE_FORMAT_TAR_VALUE = "TAR";
inline constexpr std::string_view MERGE_FORMAT_ZIP_VALUE = "ZIP";
inline constexpr std::string_view MERGE_FORMAT_CONCAT_VALUE = "Binary Concatenation";
inline constexpr std::string_view MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE = "FlowFile Stream, v3";
inline constexpr std::string_view DELIMITER_STRATEGY_FILENAME = "Filename";
inline constexpr std::string_view DELIMITER_STRATEGY_TEXT = "Text";
inline constexpr std::string_view ATTRIBUTE_STRATEGY_KEEP_COMMON = "Keep Only Common Attributes";
inline constexpr std::string_view ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE = "Keep All Unique Attributes";
} // namespace merge_content_options
class MergeBin {
public:
virtual ~MergeBin() = default;
// merge the flows in the bin
virtual void merge(core::ProcessSession &session,
std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile) = 0;
};
class BinaryConcatenationMerge : public MergeBin {
public:
BinaryConcatenationMerge(std::string header, std::string footer, std::string demarcator);
void merge(core::ProcessSession &session,
std::deque<std::shared_ptr<core::FlowFile>>& flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) override;
// Nest Callback Class for write stream
class WriteCallback {
public:
WriteCallback(std::string &header, std::string &footer, std::string &demarcator,
std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer) :
header_(header), footer_(footer), demarcator_(demarcator), flows_(flows), serializer_(serializer) {
}
std::string &header_;
std::string &footer_;
std::string &demarcator_;
std::deque<std::shared_ptr<core::FlowFile>> &flows_;
FlowFileSerializer& serializer_;
int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) const {
size_t write_size_sum = 0;
if (!header_.empty()) {
const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(header_.data()), header_.size());
if (io::isError(write_ret))
return -1;
write_size_sum += write_ret;
}
bool isFirst = true;
for (const auto& flow : flows_) {
if (!isFirst && !demarcator_.empty()) {
const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(demarcator_.data()), demarcator_.size());
if (io::isError(write_ret))
return -1;
write_size_sum += write_ret;
}
const auto len = serializer_.serialize(flow, stream);
if (len < 0)
return len;
write_size_sum += gsl::narrow<size_t>(len);
isFirst = false;
}
if (!footer_.empty()) {
const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(footer_.data()), footer_.size());
if (io::isError(write_ret))
return -1;
write_size_sum += write_ret;
}
return gsl::narrow<int64_t>(write_size_sum);
}
};
private:
std::string header_;
std::string footer_;
std::string demarcator_;
};
class ArchiveMerge {
public:
class ArchiveWriter : public io::StreamImpl, public io::OutputStream {
public:
ArchiveWriter(archive& arch, archive_entry& entry) : arch_(arch), entry_(entry) {}
size_t write(const uint8_t* data, const size_t size) override {
if (!header_emitted_) {
if (archive_write_header(&arch_, &entry_) != ARCHIVE_OK) {
return io::STREAM_ERROR;
}
header_emitted_ = true;
}
size_t totalWrote = 0;
size_t remaining = size;
while (remaining > 0) {
const auto ret = archive_write_data(&arch_, data + totalWrote, remaining);
if (ret < 0) {
return io::STREAM_ERROR;
}
const auto zret = gsl::narrow<size_t>(ret);
if (zret == 0) {
break;
}
totalWrote += zret;
remaining -= zret;
}
return totalWrote;
}
private:
archive& arch_;
archive_entry& entry_;
bool header_emitted_{false};
};
// Nest Callback Class for write stream
class WriteCallback {
public:
WriteCallback(std::string_view merge_type, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer)
: merge_type_(merge_type),
flows_(flows),
serializer_(serializer) {
}
std::string merge_type_;
std::deque<std::shared_ptr<core::FlowFile>> &flows_;
std::shared_ptr<io::OutputStream> stream_ = nullptr;
size_t size_ = 0;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ArchiveMerge>::getLogger();
FlowFileSerializer& serializer_;
static la_ssize_t archive_write(struct archive* /*arch*/, void *context, const void *buff, size_t size) {
auto* callback = static_cast<WriteCallback *>(context);
const auto* data = static_cast<uint8_t*>(const_cast<void*>(buff));
la_ssize_t totalWrote = 0;
size_t remaining = size;
while (remaining > 0) {
const auto ret = callback->stream_->write(data + totalWrote, remaining);
if (io::isError(ret)) {
// libarchive expects us to return -1 on error
return -1;
}
if (ret == 0) {
break;
}
callback->size_ += ret;
totalWrote += static_cast<la_ssize_t>(ret);
remaining -= ret;
}
return totalWrote;
}
int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) {
const auto arch = archive_write_unique_ptr{archive_write_new()};
if (merge_type_ == merge_content_options::MERGE_FORMAT_TAR_VALUE) {
archive_write_set_format_pax_restricted(arch.get()); // tar format
}
if (merge_type_ == merge_content_options::MERGE_FORMAT_ZIP_VALUE) {
archive_write_set_format_zip(arch.get()); // zip format
}
archive_write_set_bytes_per_block(arch.get(), 0);
archive_write_add_filter_none(arch.get());
stream_ = stream;
archive_write_open(arch.get(), this, nullptr, archive_write, nullptr);
for (const auto& flow : flows_) {
auto entry = archive_entry_unique_ptr{archive_entry_new()};
std::string fileName;
flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
archive_entry_set_pathname(entry.get(), fileName.c_str());
archive_entry_set_size(entry.get(), gsl::narrow<la_int64_t>(flow->getSize()));
archive_entry_set_mode(entry.get(), S_IFREG | 0755);
if (merge_type_ == merge_content_options::MERGE_FORMAT_TAR_VALUE) {
if (std::string perm; flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) {
try {
const int perm_int = std::stoi(perm);
logger_->log_debug("Merge Tar File {} permission {}", fileName, perm);
archive_entry_set_perm(entry.get(), static_cast<mode_t>(perm_int));
} catch (...) {
}
}
}
const auto ret = serializer_.serialize(flow, std::make_shared<ArchiveWriter>(*arch, *entry));
if (ret < 0) {
return ret;
}
}
return gsl::narrow<int64_t>(size_);
}
};
};
class TarMerge: public ArchiveMerge, public MergeBin {
public:
void merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override;
};
class ZipMerge: public ArchiveMerge, public MergeBin {
public:
void merge(core::ProcessSession &session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override;
};
class AttributeMerger {
public:
explicit AttributeMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
: flows_(flows) {}
void mergeAttributes(core::ProcessSession &session, core::FlowFile& merge_flow);
virtual ~AttributeMerger() = default;
protected:
std::map<std::string, std::string> getMergedAttributes();
virtual void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) = 0;
const std::deque<std::shared_ptr<core::FlowFile>> &flows_;
};
class KeepOnlyCommonAttributesMerger: public AttributeMerger {
public:
explicit KeepOnlyCommonAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
: AttributeMerger(flows) {}
protected:
void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) override;
};
class KeepAllUniqueAttributesMerger: public AttributeMerger {
public:
explicit KeepAllUniqueAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
: AttributeMerger(flows) {}
protected:
void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) override;
private:
std::vector<std::string> removed_attributes_;
};
/**
* A processor that merges multiple correlated flow files to a single flow file
*
* Concepts:
* - Batch size: represents the maximum number of flow files to be processed from the incoming relationship
* - Bin (or bundle): represents a set of flow files that belong together defined by the processor properties. Correlated flow files are defined by the CorrelationAttributeName property which
* defines the attribute that provides the groupid for the bin the flow file belongs to
* - Ready bin: when a bin reaches a limit defined by the maximum age or the maximum size, the bin becomes ready, and ready bins can be merged
* - Group: a set of bins with the same groupid. In case a bin cannot accept a new flow files (e.g. it would go above its size limit), a new bin is created with this new flow file and added
* to the same group of bins
*/
class MergeContent : public processors::BinFiles {
public:
explicit MergeContent(const std::string& name, const utils::Identifier& uuid = {})
: processors::BinFiles(name, uuid) {
logger_ = core::logging::LoggerFactory<MergeContent>::getLogger(uuid_);
mergeStrategy_ = merge_content_options::MERGE_STRATEGY_DEFRAGMENT;
mergeFormat_ = merge_content_options::MERGE_FORMAT_CONCAT_VALUE;
delimiterStrategy_ = merge_content_options::DELIMITER_STRATEGY_FILENAME;
keepPath_ = false;
attributeStrategy_ = merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON;
}
~MergeContent() override = default;
EXTENSIONAPI static constexpr const char* Description = "Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. "
"MergeContent should be configured with only one incoming connection as it won't create grouped Flow Files."
"This processor updates the mime.type attribute as appropriate.";
EXTENSIONAPI static constexpr auto MergeStrategy = core::PropertyDefinitionBuilder<2>::createProperty("Merge Strategy")
.withDescription("Defragment or Bin-Packing Algorithm")
.withAllowedValues({merge_content_options::MERGE_STRATEGY_DEFRAGMENT, merge_content_options::MERGE_STRATEGY_BIN_PACK})
.withDefaultValue(merge_content_options::MERGE_STRATEGY_BIN_PACK)
.build();
EXTENSIONAPI static constexpr auto MergeFormat = core::PropertyDefinitionBuilder<4>::createProperty("Merge Format")
.withDescription("Merge Format")
.withAllowedValues({
merge_content_options::MERGE_FORMAT_CONCAT_VALUE,
merge_content_options::MERGE_FORMAT_TAR_VALUE,
merge_content_options::MERGE_FORMAT_ZIP_VALUE,
merge_content_options::MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE})
.withDefaultValue(merge_content_options::MERGE_FORMAT_CONCAT_VALUE)
.build();
EXTENSIONAPI static constexpr auto CorrelationAttributeName = core::PropertyDefinitionBuilder<>::createProperty("Correlation Attribute Name")
.withDescription("Correlation Attribute Name")
.build();
EXTENSIONAPI static constexpr auto DelimiterStrategy = core::PropertyDefinitionBuilder<2>::createProperty("Delimiter Strategy")
.withDescription("Determines if Header, Footer, and Demarcator should point to files")
.withAllowedValues({merge_content_options::DELIMITER_STRATEGY_FILENAME, merge_content_options::DELIMITER_STRATEGY_TEXT})
.withDefaultValue(merge_content_options::DELIMITER_STRATEGY_FILENAME)
.build();
EXTENSIONAPI static constexpr auto Header = core::PropertyDefinitionBuilder<>::createProperty("Header File")
.withDescription("Filename specifying the header to use")
.build();
EXTENSIONAPI static constexpr auto Footer = core::PropertyDefinitionBuilder<>::createProperty("Footer File")
.withDescription("Filename specifying the footer to use")
.build();
EXTENSIONAPI static constexpr auto Demarcator = core::PropertyDefinitionBuilder<>::createProperty("Demarcator File")
.withDescription("Filename specifying the demarcator to use")
.build();
EXTENSIONAPI static constexpr auto KeepPath = core::PropertyDefinitionBuilder<>::createProperty("Keep Path")
.withDescription("If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry")
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
.withDefaultValue("false")
.build();
EXTENSIONAPI static constexpr auto AttributeStrategy = core::PropertyDefinitionBuilder<2>::createProperty("Attribute Strategy")
.withDescription("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, "
"any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile "
"(in which case neither, or none, of the conflicting attributes will be kept). If 'Keep Only Common Attributes' is selected, "
"only the attributes that exist on all FlowFiles in the bundle, with the same value, will be preserved.")
.withAllowedValues({merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON, merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE})
.withDefaultValue(merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON)
.build();
EXTENSIONAPI static constexpr auto Properties = utils::array_cat(BinFiles::Properties, std::to_array<core::PropertyReference>({
MergeStrategy,
MergeFormat,
CorrelationAttributeName,
DelimiterStrategy,
KeepPath,
Header,
Footer,
Demarcator,
AttributeStrategy
}));
EXTENSIONAPI static constexpr auto Merge = core::RelationshipDefinition{"merged", "The FlowFile containing the merged content"};
EXTENSIONAPI static constexpr auto Relationships = utils::array_cat(BinFiles::Relationships, std::array{Merge});
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
void initialize() override;
bool processBin(core::ProcessSession &session, std::unique_ptr<Bin> &bin) override;
protected:
// Returns a group ID representing a bin. This allows flow files to be binned into like groups
std::string getGroupId(const std::shared_ptr<core::FlowFile>& flow) override;
// check whether the defragment bin is validate
static bool checkDefragment(std::unique_ptr<Bin> &bin);
private:
void validatePropertyOptions();
std::string mergeStrategy_;
std::string mergeFormat_;
std::string correlationAttributeName_;
bool keepPath_;
std::string delimiterStrategy_;
std::string header_;
std::string footer_;
std::string demarcator_;
std::string headerContent_;
std::string footerContent_;
std::string demarcatorContent_;
std::string attributeStrategy_;
static std::string readContent(const std::string& path);
};
} // namespace org::apache::nifi::minifi::processors