bool MergeContent::processBin()

in extensions/libarchive/MergeContent.cpp [197:281]


bool MergeContent::processBin(core::ProcessSession &session, std::unique_ptr<Bin> &bin) {
  if (mergeStrategy_ != merge_content_options::MERGE_STRATEGY_DEFRAGMENT && mergeStrategy_ != merge_content_options::MERGE_STRATEGY_BIN_PACK)
    return false;

  if (mergeStrategy_ == merge_content_options::MERGE_STRATEGY_DEFRAGMENT) {
    // check the flowfile fragment values
    if (!checkDefragment(bin)) {
      logger_->log_error("Merge Content check defgrament failed");
      return false;
    }
    // sort the flowfile fragment index
    std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
    std::sort(flows.begin(), flows.end(), [] (const std::shared_ptr<core::FlowFile> &first, const std::shared_ptr<core::FlowFile> &second)
        {std::string value;
         first->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
         int indexFirst = std::stoi(value);
         second->getAttribute(BinFiles::FRAGMENT_INDEX_ATTRIBUTE, value);
         int indexSecond = std::stoi(value);
         return indexSecond > indexFirst;
        });
  }

  std::shared_ptr<core::FlowFile> merge_flow = session.create();
  auto removeMergeFlow = gsl::finally([&](){
    if (!session.hasBeenTransferred(*merge_flow)) {
      session.remove(merge_flow);
    }
  });
  if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON) {
    KeepOnlyCommonAttributesMerger(bin->getFlowFile()).mergeAttributes(session, *merge_flow);
  } else if (attributeStrategy_ == merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE) {
    KeepAllUniqueAttributesMerger(bin->getFlowFile()).mergeAttributes(session, *merge_flow);
  } else {
    logger_->log_error("Attribute strategy not supported {}", attributeStrategy_);
    return false;
  }

  auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, const io::InputStreamCallback& cb) {
    return session.read(ff, cb);
  };

  const char* mimeType = nullptr;
  std::unique_ptr<MergeBin> mergeBin;
  std::unique_ptr<minifi::FlowFileSerializer> serializer = std::make_unique<PayloadSerializer>(flowFileReader);
  if (mergeFormat_ == merge_content_options::MERGE_FORMAT_CONCAT_VALUE) {
    mergeBin = std::make_unique<BinaryConcatenationMerge>(headerContent_, footerContent_, demarcatorContent_);
    mimeType = "application/octet-stream";
  } else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE) {
    // disregard header, demarcator, footer
    mergeBin = std::make_unique<BinaryConcatenationMerge>("", "", "");
    serializer = std::make_unique<FlowFileV3Serializer>(flowFileReader);
    mimeType = "application/flowfile-v3";
  } else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_TAR_VALUE) {
    mergeBin = std::make_unique<TarMerge>();
    mimeType = "application/tar";
  } else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_ZIP_VALUE) {
    mergeBin = std::make_unique<ZipMerge>();
    mimeType = "application/zip";
  } else {
    logger_->log_error("Merge format not supported {}", mergeFormat_);
    return false;
  }

  try {
    mergeBin->merge(session, bin->getFlowFile(), *serializer, merge_flow);
    session.putAttribute(*merge_flow, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
  } catch (const std::exception& ex) {
    logger_->log_error("Merge Content merge catch exception, type: {}, what: {}", typeid(ex).name(), ex.what());
    return false;
  } catch (...) {
    logger_->log_error("Merge Content merge catch exception, type: {}", getCurrentExceptionTypeName());
    return false;
  }
  session.putAttribute(*merge_flow, BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(bin->getSize()));

  // we successfully merge the flow
  session.transfer(merge_flow, Merge);
  std::deque<std::shared_ptr<core::FlowFile>> &flows = bin->getFlowFile();
  for (const auto& flow : flows) {
    session.transfer(flow, Original);
  }
  logger_->log_debug("Merge FlowFile record UUID {}, payload length {}", merge_flow->getUUIDStr(), merge_flow->getSize());

  return true;
}