in extensions/libarchive/CompressContent.cpp [84:203]
void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<core::ProcessSession>& session) {
session->remove(flowFile);
io::CompressionFormat compressFormat;
if (compressFormat_ == compress_content::ExtendedCompressionFormat::USE_MIME_TYPE) {
std::string attr;
flowFile->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, attr);
if (attr.empty()) {
logger_->log_error("No %s attribute existed for the flow, route to failure", std::string(core::SpecialFlowAttribute::MIME_TYPE));
session->transfer(flowFile, Failure);
return;
}
auto search = compressionFormatMimeTypeMap_.find(attr);
if (search != compressionFormatMimeTypeMap_.end()) {
compressFormat = search->second;
} else {
logger_->log_info("Mime type of %s is not indicated a support format, route to success", attr);
session->transfer(flowFile, Success);
return;
}
} else {
compressFormat = *magic_enum::enum_cast<io::CompressionFormat>(magic_enum::enum_name(compressFormat_));
}
std::string mimeType = toMimeType(compressFormat);
// Validate
if (!encapsulateInTar_ && compressFormat != io::CompressionFormat::GZIP) {
logger_->log_error("non-TAR encapsulated format only supports GZIP compression");
session->transfer(flowFile, Failure);
return;
}
if (compressFormat == io::CompressionFormat::BZIP2 && archive_bzlib_version() == nullptr) {
logger_->log_error("%s compression format is requested, but the agent was compiled without BZip2 support", std::string{magic_enum::enum_name(compressFormat)});
session->transfer(flowFile, Failure);
return;
}
if ((compressFormat == io::CompressionFormat::LZMA || compressFormat == io::CompressionFormat::XZ_LZMA2) && archive_liblzma_version() == nullptr) {
logger_->log_error("%s compression format is requested, but the agent was compiled without LZMA support ", std::string{magic_enum::enum_name(compressFormat)});
session->transfer(flowFile, Failure);
return;
}
std::string fileExtension;
auto search = fileExtension_.find(compressFormat);
if (search != fileExtension_.end()) {
fileExtension = search->second;
}
std::shared_ptr<core::FlowFile> result = session->create(flowFile);
bool success = true;
if (encapsulateInTar_) {
std::function<int64_t(const std::shared_ptr<io::InputStream>&, const std::shared_ptr<io::OutputStream>&)> transformer;
if (compressMode_ == compress_content::CompressionMode::compress) {
std::string filename;
flowFile->getAttribute(core::SpecialFlowAttribute::FILENAME, filename);
transformer = [&, filename] (const std::shared_ptr<io::InputStream>& in, const std::shared_ptr<io::OutputStream>& out) -> int64_t {
io::WriteArchiveStreamImpl compressor(compressLevel_, compressFormat, out);
if (!compressor.newEntry({filename, in->size()})) {
return -1;
}
return internal::pipe(*in, compressor);
};
} else {
transformer = [&] (const std::shared_ptr<io::InputStream>& in, const std::shared_ptr<io::OutputStream>& out) -> int64_t {
io::ReadArchiveStreamImpl decompressor(in);
if (!decompressor.nextEntry()) {
success = false;
return 0; // prevents a session rollback
}
auto ret = internal::pipe(decompressor, *out);
if (ret < 0) {
success = false;
return 0; // prevents a session rollback
}
return ret;
};
}
session->write(result, [&] (const auto& out) {
return session->read(flowFile, [&] (const auto& in) {
return transformer(in, out);
});
});
} else {
CompressContent::GzipWriteCallback callback(compressMode_, compressLevel_, flowFile, session);
session->write(result, std::ref(callback));
success = callback.success_;
}
if (!success) {
logger_->log_error("Compress Content processing fail for the flow with UUID %s", flowFile->getUUIDStr());
session->transfer(flowFile, Failure);
session->remove(result);
} else {
std::string fileName;
result->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
if (compressMode_ == compress_content::CompressionMode::compress) {
session->putAttribute(result, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
if (updateFileName_) {
if (encapsulateInTar_) {
fileName = fileName + TAR_EXT;
}
fileName = fileName + fileExtension;
session->putAttribute(result, core::SpecialFlowAttribute::FILENAME, fileName);
}
} else {
session->removeAttribute(result, core::SpecialFlowAttribute::MIME_TYPE);
if (updateFileName_) {
if (utils::StringUtils::endsWith(fileName, fileExtension)) {
fileName = fileName.substr(0, fileName.size() - fileExtension.size());
if (encapsulateInTar_ && utils::StringUtils::endsWith(fileName, TAR_EXT)) {
fileName = fileName.substr(0, fileName.size() - TAR_EXT.size());
}
session->putAttribute(result, core::SpecialFlowAttribute::FILENAME, fileName);
}
}
}
logger_->log_debug("Compress Content processing success for the flow with UUID %s name %s", result->getUUIDStr(), fileName);
session->transfer(result, Success);
}
}