extensions/standard-processors/processors/SplitContent.h (94 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <memory> #include <optional> #include <string_view> #include "FlowFileRecord.h" #include "core/ProcessSession.h" #include "core/Processor.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" #include "core/RelationshipDefinition.h" #include "utils/Export.h" namespace org::apache::nifi::minifi::processors { class SplitContent final : public core::ProcessorImpl { public: explicit SplitContent(const std::string_view name, const utils::Identifier& uuid = {}) : ProcessorImpl(name, uuid) { logger_ = core::logging::LoggerFactory<SplitContent>::getLogger(uuid_); } using size_type = std::vector<std::byte>::size_type; enum class ByteSequenceFormat { Hexadecimal, Text }; enum class ByteSequenceLocation { Trailing, Leading }; EXTENSIONAPI static constexpr auto Description = "Splits incoming FlowFiles by a specified byte sequence"; EXTENSIONAPI static constexpr auto ByteSequenceFormatProperty = core::PropertyDefinitionBuilder<2>::createProperty("Byte Sequence Format") .withDescription("Specifies how the <Byte Sequence> property should be interpreted") .isRequired(true) .withDefaultValue(magic_enum::enum_name(ByteSequenceFormat::Hexadecimal)) .withAllowedValues(magic_enum::enum_names<ByteSequenceFormat>()) .build(); EXTENSIONAPI static constexpr auto ByteSequence = core::PropertyDefinitionBuilder<>::createProperty("Byte Sequence") .withDescription("A representation of bytes to look for and upon which to split the source file into separate files") .isRequired(true) .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR) .build(); EXTENSIONAPI static constexpr auto KeepByteSequence = core::PropertyDefinitionBuilder<>::createProperty("Keep Byte Sequence") .withDescription("Determines whether or not the Byte Sequence should be included with each Split") .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) .withDefaultValue("false") .isRequired(true) .build(); EXTENSIONAPI static constexpr auto ByteSequenceLocationProperty = core::PropertyDefinitionBuilder<2>::createProperty("Byte Sequence Location") .withDescription( "If <Keep Byte Sequence> is set to true, specifies whether the byte sequence should be added to the end of the first split or the beginning of the second; " "if <Keep Byte Sequence> is false, this property is ignored.") .withDefaultValue(magic_enum::enum_name(ByteSequenceLocation::Trailing)) .withAllowedValues(magic_enum::enum_names<ByteSequenceLocation>()) .isRequired(true) .build(); EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ByteSequenceFormatProperty, ByteSequence, KeepByteSequence, ByteSequenceLocationProperty}); EXTENSIONAPI static constexpr auto Splits = core::RelationshipDefinition{"splits", "All Splits will be routed to the splits relationship"}; EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", "The original file"}; EXTENSIONAPI static constexpr auto Relationships = std::array{Original, Splits}; EXTENSIONAPI static constexpr auto FragmentIdentifierOutputAttribute = core::OutputAttributeDefinition<0>{"fragment.identifier", {}, "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"}; EXTENSIONAPI static constexpr auto FragmentIndexOutputAttribute = core::OutputAttributeDefinition<0>{"fragment.index", {}, "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"}; EXTENSIONAPI static constexpr auto FragmentCountOutputAttribute = core::OutputAttributeDefinition<0>{"fragment.count", {}, "The number of split FlowFiles generated from the parent FlowFile"}; EXTENSIONAPI static constexpr auto SegmentOriginalFilenameOutputAttribute = core::OutputAttributeDefinition<0>{"segment.original.filename", {}, "The filename of the parent FlowFile"}; EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 4>{FragmentIdentifierOutputAttribute, FragmentIndexOutputAttribute, FragmentCountOutputAttribute, SegmentOriginalFilenameOutputAttribute}; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; EXTENSIONAPI static constexpr auto InputRequirement = core::annotation::Input::INPUT_REQUIRED; EXTENSIONAPI static constexpr bool IsSingleThreaded = true; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void initialize() override; class ByteSequenceMatcher { public: using size_type = std::vector<std::byte>::size_type; explicit ByteSequenceMatcher(std::vector<std::byte> byte_sequence); size_type getNumberOfMatchingBytes(size_type number_of_currently_matching_bytes, std::byte next_byte); size_type getPreviousMaxMatch(size_type number_of_currently_matching_bytes); [[nodiscard]] std::span<const std::byte> getByteSequence() const { return byte_sequence_; } private: struct node { std::byte byte; std::unordered_map<std::byte, size_type> cache; std::optional<size_type> previous_max_match; }; std::vector<node> byte_sequence_nodes_; const std::vector<std::byte> byte_sequence_; }; private: std::optional<ByteSequenceMatcher> byte_sequence_matcher_; bool keep_byte_sequence = false; ByteSequenceLocation byte_sequence_location_ = ByteSequenceLocation::Trailing; size_t buffer_size_{}; }; } // namespace org::apache::nifi::minifi::processors