extensions/libarchive/CompressContent.h (181 lines of code) (raw):

/** * @file CompressContent.h * CompressContent class declaration * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <cinttypes> #include <vector> #include <utility> #include <memory> #include <map> #include <string> #include "minifi-cpp/core/PropertyValidator.h" #include "archive_entry.h" #include "archive.h" #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/Core.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "core/logging/LoggerFactory.h" #include "io/ZlibStream.h" #include "utils/Enum.h" #include "utils/gsl.h" #include "utils/Export.h" #include "WriteArchiveStream.h" #include "ReadArchiveStream.h" namespace org::apache::nifi::minifi::processors::compress_content { enum class CompressionMode { compress, decompress }; enum class ExtendedCompressionFormat { GZIP, LZMA, XZ_LZMA2, BZIP2, USE_MIME_TYPE }; } // namespace org::apache::nifi::minifi::processors::compress_content namespace magic_enum::customize { using ExtendedCompressionFormat = org::apache::nifi::minifi::processors::compress_content::ExtendedCompressionFormat; template <> constexpr customize_t enum_name<ExtendedCompressionFormat>(ExtendedCompressionFormat value) noexcept { switch (value) { case ExtendedCompressionFormat::GZIP: return "gzip"; case ExtendedCompressionFormat::LZMA: return "lzma"; case ExtendedCompressionFormat::XZ_LZMA2: return "xz-lzma2"; case ExtendedCompressionFormat::BZIP2: return "bzip2"; case ExtendedCompressionFormat::USE_MIME_TYPE: return "use mime.type attribute"; } return invalid_tag; } } // namespace magic_enum::customize namespace org::apache::nifi::minifi::processors { class CompressContent : public core::ProcessorImpl { public: explicit CompressContent(const std::string_view name, const utils::Identifier& uuid = {}) : core::ProcessorImpl(name, uuid) { logger_ = core::logging::LoggerFactory<CompressContent>::getLogger(uuid_); } ~CompressContent() override = default; EXTENSIONAPI static constexpr const char* Description = "Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm " "and updates the mime.type attribute as appropriate"; EXTENSIONAPI static constexpr auto CompressMode = core::PropertyDefinitionBuilder<magic_enum::enum_count<compress_content::CompressionMode>()>::createProperty("Mode") .withDescription("Indicates whether the processor should compress content or decompress content.") .isRequired(true) .withDefaultValue(magic_enum::enum_name(compress_content::CompressionMode::compress)) .withAllowedValues(magic_enum::enum_names<compress_content::CompressionMode>()) .build(); EXTENSIONAPI static constexpr auto CompressLevel = core::PropertyDefinitionBuilder<>::createProperty("Compression Level") .withDescription("The compression level to use; this is valid only when using GZIP compression.") .isRequired(true) .withValidator(core::StandardPropertyValidators::INTEGER_VALIDATOR) .withDefaultValue("1") .build(); EXTENSIONAPI static constexpr auto CompressFormat = core::PropertyDefinitionBuilder<magic_enum::enum_count<compress_content::ExtendedCompressionFormat>()>::createProperty("Compression Format") .withDescription("The compression format to use.") .isRequired(false) .withDefaultValue(magic_enum::enum_name(compress_content::ExtendedCompressionFormat::USE_MIME_TYPE)) .withAllowedValues(magic_enum::enum_names<compress_content::ExtendedCompressionFormat>()) .build(); EXTENSIONAPI static constexpr auto UpdateFileName = core::PropertyDefinitionBuilder<>::createProperty("Update Filename") .withDescription("Determines if filename extension need to be updated") .isRequired(false) .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) .withDefaultValue("false") .build(); EXTENSIONAPI static constexpr auto EncapsulateInTar = core::PropertyDefinitionBuilder<>::createProperty("Encapsulate in TAR") .withDescription("If true, on compression the FlowFile is added to a TAR archive and then compressed, " "and on decompression a compressed, TAR-encapsulated FlowFile is expected.\n" "If false, on compression the content of the FlowFile simply gets compressed, and on decompression a simple compressed content is expected.\n" "true is the behaviour compatible with older MiNiFi C++ versions, false is the behaviour compatible with NiFi.") .isRequired(false) .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) .withDefaultValue("true") .build(); EXTENSIONAPI static constexpr auto BatchSize = core::PropertyDefinitionBuilder<>::createProperty("Batch Size") .withDescription("Maximum number of FlowFiles processed in a single session") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .withDefaultValue("1") .build(); EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ CompressMode, CompressLevel, CompressFormat, UpdateFileName, EncapsulateInTar, BatchSize }); EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed"}; EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "FlowFiles will be transferred to the failure relationship if they fail to compress/decompress"}; EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; EXTENSIONAPI static constexpr bool IsSingleThreaded = false; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS static const std::string TAR_EXT; class GzipWriteCallback { public: GzipWriteCallback(compress_content::CompressionMode compress_mode, int compress_level, std::shared_ptr<core::FlowFile> flow, core::ProcessSession& session) : compress_mode_(compress_mode) , compress_level_(compress_level) , flow_(std::move(flow)) , session_(session) { } std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<CompressContent>::getLogger(); compress_content::CompressionMode compress_mode_; int compress_level_; std::shared_ptr<core::FlowFile> flow_; core::ProcessSession& session_; bool success_{false}; int64_t operator()(const std::shared_ptr<io::OutputStream>& output_stream) { std::shared_ptr<io::ZlibBaseStream> filterStream; if (compress_mode_ == compress_content::CompressionMode::compress) { filterStream = std::make_shared<io::ZlibCompressStream>(gsl::make_not_null(output_stream.get()), io::ZlibCompressionFormat::GZIP, compress_level_); } else { filterStream = std::make_shared<io::ZlibDecompressStream>(gsl::make_not_null(output_stream.get()), io::ZlibCompressionFormat::GZIP); } session_.read(flow_, [this, &filterStream](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t { std::vector<std::byte> buffer(16 * 1024U); size_t read_size = 0; while (read_size < flow_->getSize()) { const auto ret = input_stream->read(buffer); if (io::isError(ret)) { return -1; } else if (ret == 0) { break; } else { const auto writeret = filterStream->write(gsl::make_span(buffer).subspan(0, ret)); if (io::isError(writeret) || gsl::narrow<size_t>(writeret) != ret) { return -1; } read_size += ret; } } filterStream->close(); return gsl::narrow<int64_t>(read_size); }); success_ = filterStream->isFinished(); return gsl::narrow<int64_t>(flow_->getSize()); } }; void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void initialize() override; private: static std::string toMimeType(io::CompressionFormat format); void processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, core::ProcessSession& session); int compressLevel_{}; compress_content::CompressionMode compressMode_; compress_content::ExtendedCompressionFormat compressFormat_; bool updateFileName_ = false; bool encapsulateInTar_ = false; uint64_t batchSize_{1}; static const std::map<std::string, io::CompressionFormat> compressionFormatMimeTypeMap_; static const std::map<io::CompressionFormat, std::string> fileExtension_; }; } // namespace org::apache::nifi::minifi::processors