in extensions/libarchive/MergeContent.cpp [197:281]
bool MergeContent::processBin(core::ProcessSession &session, std::unique_ptr<Bin> &bin) {
if (mergeStrategy_ != merge_content_options::MERGE_STRATEGY_DEFRAGMENT && mergeStrategy_ != merge_content_options::MERGE_STRATEGY_BIN_PACK)
return false;
if (mergeStrategy_ == merge_content_options::MERGE_STRATEGY_DEFRAGMENT) {
// check the flowfile fragment values
if (!checkDefragment(bin)) {
logger_->log_error("Merge Content check defgrament failed");
return false;
}
// sort the flowfile fragment index
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
std::sort(flows.begin(), flows.end(), [] (const std::shared_ptr<core::FlowFile> &first, const std::shared_ptr<core::FlowFile> &second)
{std::string value;
first->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
int indexFirst = std::stoi(value);
second->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
int indexSecond = std::stoi(value);
return indexSecond > indexFirst;
});
}
std::shared_ptr<core::FlowFile> merge_flow = session.create();
auto removeMergeFlow = gsl::finally([&](){
if (!session.hasBeenTransferred(*merge_flow)) {
session.remove(merge_flow);
}
});
if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON) {
KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, *merge_flow);
} else if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE) {
KeepAllUniqueAttributesMerger(bin->getFlowFile()).mergeAttributes(session, *merge_flow);
} else {
logger_->log_error("Attribute strategy not supported {}", attributeStrategy_);
return false;
}
auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, const io::InputStreamCallback& cb) {
return session.read(ff, cb);
};
const char* mimeType = nullptr;
std::unique_ptr<MergeBin> mergeBin;
std::unique_ptr<minifi::FlowFileSerializer> serializer = std::make_unique<PayloadSerializer>(flowFileReader);
if (mergeFormat_ == merge_content_options::MERGE_FORMAT_CONCAT_VALUE) {
mergeBin = std::make_unique<BinaryConcatenationMerge>(headerContent_, footerContent_, demarcatorContent_);
mimeType = "application/octet-stream";
} else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE) {
// disregard header, demarcator, footer
mergeBin = std::make_unique<BinaryConcatenationMerge>("", "", "");
serializer = std::make_unique<FlowFileV3Serializer>(flowFileReader);
mimeType = "application/flowfile-v3";
} else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_TAR_VALUE) {
mergeBin = std::make_unique<TarMerge>();
mimeType = "application/tar";
} else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_ZIP_VALUE) {
mergeBin = std::make_unique<ZipMerge>();
mimeType = "application/zip";
} else {
logger_->log_error("Merge format not supported {}", mergeFormat_);
return false;
}
try {
mergeBin->merge(session, bin->getFlowFile(), *serializer, merge_flow);
session.putAttribute(*merge_flow, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
} catch (const std::exception& ex) {
logger_->log_error("Merge Content merge catch exception, type: {}, what: {}", typeid(ex).name(), ex.what());
return false;
} catch (...) {
logger_->log_error("Merge Content merge catch exception, type: {}", getCurrentExceptionTypeName());
return false;
}
session.putAttribute(*merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize()));
// we successfully merge the flow
session.transfer(merge_flow, Merge);
std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
for (const auto& flow : flows) {
session.transfer(flow, Original);
}
logger_->log_debug("Merge FlowFile record UUID {}, payload length {}", merge_flow->getUUIDStr(), merge_flow->getSize());
return true;
}