void CompressContent::processFlowFile()

in extensions/libarchive/CompressContent.cpp [84:203]


void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, core::ProcessSession& session) {
  session.remove(flowFile);

  io::CompressionFormat compressFormat{};
  if (compressFormat_ == compress_content::ExtendedCompressionFormat::USE_MIME_TYPE) {
    std::string attr;
    flowFile->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, attr);
    if (attr.empty()) {
      logger_->log_error("No {} attribute existed for the flow, route to failure", core::SpecialFlowAttribute::MIME_TYPE);
      session.transfer(flowFile, Failure);
      return;
    }
    auto search = compressionFormatMimeTypeMap_.find(attr);
    if (search != compressionFormatMimeTypeMap_.end()) {
      compressFormat = search->second;
    } else {
      logger_->log_info("Mime type of {} is not indicated a support format, route to success", attr);
      session.transfer(flowFile, Success);
      return;
    }
  } else {
    compressFormat = *magic_enum::enum_cast<io::CompressionFormat>(magic_enum::enum_name(compressFormat_));
  }
  std::string mimeType = toMimeType(compressFormat);

  // Validate
  if (!encapsulateInTar_ && compressFormat != io::CompressionFormat::GZIP) {
    logger_->log_error("non-TAR encapsulated format only supports GZIP compression");
    session.transfer(flowFile, Failure);
    return;
  }
  if (compressFormat == io::CompressionFormat::BZIP2 && archive_bzlib_version() == nullptr) {
    logger_->log_error("{} compression format is requested, but the agent was compiled without BZip2 support", magic_enum::enum_name(compressFormat));
    session.transfer(flowFile, Failure);
    return;
  }
  if ((compressFormat == io::CompressionFormat::LZMA || compressFormat == io::CompressionFormat::XZ_LZMA2) && archive_liblzma_version() == nullptr) {
    logger_->log_error("{} compression format is requested, but the agent was compiled without LZMA support ", magic_enum::enum_name(compressFormat));
    session.transfer(flowFile, Failure);
    return;
  }

  std::string fileExtension;
  auto search = fileExtension_.find(compressFormat);
  if (search != fileExtension_.end()) {
    fileExtension = search->second;
  }
  std::shared_ptr<core::FlowFile> result = session.create(flowFile.get());
  bool success = true;
  if (encapsulateInTar_) {
    std::function<int64_t(const std::shared_ptr<io::InputStream>&, const std::shared_ptr<io::OutputStream>&)> transformer;

    if (compressMode_ == compress_content::CompressionMode::compress) {
      std::string filename;
      flowFile->getAttribute(core::SpecialFlowAttribute::FILENAME, filename);
      transformer = [&, filename] (const std::shared_ptr<io::InputStream>& in, const std::shared_ptr<io::OutputStream>& out) -> int64_t {
        io::WriteArchiveStreamImpl compressor(compressLevel_, compressFormat, out);
        if (!compressor.newEntry({filename, in->size()})) {
          return -1;
        }
        return internal::pipe(*in, compressor);
      };
    } else {
      transformer = [&] (const std::shared_ptr<io::InputStream>& in, const std::shared_ptr<io::OutputStream>& out) -> int64_t {
        io::ReadArchiveStreamImpl decompressor(in);
        if (!decompressor.nextEntry()) {
          success = false;
          return 0;  // prevents a session rollback
        }
        auto ret = internal::pipe(decompressor, *out);
        if (ret < 0) {
          success = false;
          return 0;  // prevents a session rollback
        }
        return ret;
      };
    }
    session.write(result, [&] (const auto& out) {
      return session.read(flowFile, [&] (const auto& in) {
        return transformer(in, out);
      });
    });
  } else {
    CompressContent::GzipWriteCallback callback(compressMode_, compressLevel_, flowFile, session);
    session.write(result, std::ref(callback));
    success = callback.success_;
  }

  if (!success) {
    logger_->log_error("Compress Content processing fail for the flow with UUID {}", flowFile->getUUIDStr());
    session.transfer(flowFile, Failure);
    session.remove(result);
  } else {
    std::string fileName;
    result->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
    if (compressMode_ == compress_content::CompressionMode::compress) {
      session.putAttribute(*result, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
      if (updateFileName_) {
        if (encapsulateInTar_) {
          fileName = fileName + TAR_EXT;
        }
        fileName = fileName + fileExtension;
        session.putAttribute(*result, core::SpecialFlowAttribute::FILENAME, fileName);
      }
    } else {
      session.removeAttribute(*result, core::SpecialFlowAttribute::MIME_TYPE);
      if (updateFileName_) {
        if (utils::string::endsWith(fileName, fileExtension)) {
          fileName = fileName.substr(0, fileName.size() - fileExtension.size());
          if (encapsulateInTar_ && utils::string::endsWith(fileName, TAR_EXT)) {
            fileName = fileName.substr(0, fileName.size() - TAR_EXT.size());
          }
          session.putAttribute(*result, core::SpecialFlowAttribute::FILENAME, fileName);
        }
      }
    }
    logger_->log_debug("Compress Content processing success for the flow with UUID {} name {}", result->getUUIDStr(), fileName);
    session.transfer(result, Success);
  }
}