extensions/standard-processors/processors/RetryFlowFile.h (130 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <atomic> #include <memory> #include <optional> #include <queue> #include <string> #include <string_view> #include <utility> #include <vector> #include "core/Core.h" #include "core/OutputAttributeDefinition.h" #include "core/Processor.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" #include "core/RelationshipDefinition.h" #include "core/logging/LoggerFactory.h" #include "utils/OptionalUtils.h" #include "utils/Export.h" namespace org::apache::nifi::minifi::processors { class RetryFlowFile : public core::ProcessorImpl { public: explicit RetryFlowFile(const std::string_view name, const utils::Identifier& uuid = {}) : ProcessorImpl(name, uuid) { logger_ = core::logging::LoggerFactory<RetryFlowFile>::getLogger(uuid_); } ~RetryFlowFile() override = default; // ReuseMode allowed values EXTENSIONAPI static constexpr std::string_view FAIL_ON_REUSE = "Fail on Reuse"; EXTENSIONAPI static constexpr std::string_view WARN_ON_REUSE = "Warn on Reuse"; EXTENSIONAPI static constexpr std::string_view RESET_REUSE = "Reset Reuse"; EXTENSIONAPI static constexpr const char* Description = "FlowFiles passed to this Processor have a 'Retry Attribute' value checked against a configured 'Maximum Retries' value. " "If the current attribute value is below the configured maximum, the FlowFile is passed to a retry relationship. " "The FlowFile may or may not be penalized in that condition. If the FlowFile's attribute value exceeds the configured maximum, " "the FlowFile will be passed to a 'retries_exceeded' relationship. " "WARNING: If the incoming FlowFile has a non-numeric value in the configured 'Retry Attribute' attribute, it will be reset to '1'. " "You may choose to fail the FlowFile instead of performing the reset. Additional dynamic properties can be defined for any attributes " "you wish to add to the FlowFiles transferred to 'retries_exceeded'. These attributes support attribute expression language."; EXTENSIONAPI static constexpr auto RetryAttribute = core::PropertyDefinitionBuilder<>::createProperty("Retry Attribute") .withDescription( "The name of the attribute that contains the current retry count for the FlowFile." "WARNING: If the name matches an attribute already on the FlowFile that does not contain a numerical value, " "the processor will either overwrite that attribute with '1' or fail based on configuration.") .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR) .withDefaultValue("flowfile.retries") .supportsExpressionLanguage(true) .isRequired(true) .build(); EXTENSIONAPI static constexpr auto MaximumRetries = core::PropertyDefinitionBuilder<>::createProperty("Maximum Retries") .withDescription("The maximum number of times a FlowFile can be retried before being passed to the 'retries_exceeded' relationship.") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .withDefaultValue("3") .supportsExpressionLanguage(true) .isRequired(true) .build(); EXTENSIONAPI static constexpr auto PenalizeRetries = core::PropertyDefinitionBuilder<>::createProperty("Penalize Retries") .withDescription("If set to 'true', this Processor will penalize input FlowFiles before passing them to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.") .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) .withDefaultValue("true") .isRequired(true) .build(); EXTENSIONAPI static constexpr auto FailOnNonNumericalOverwrite = core::PropertyDefinitionBuilder<>::createProperty("Fail on Non-numerical Overwrite") .withDescription("If the FlowFile already has the attribute defined in 'Retry Attribute' that is *not* a number, fail the FlowFile instead of resetting that value to '1'") .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) .withDefaultValue("false") .isRequired(true) .build(); EXTENSIONAPI static constexpr auto ReuseMode = core::PropertyDefinitionBuilder<3>::createProperty("Reuse Mode") .withDescription( "Defines how the Processor behaves if the retry FlowFile has a different retry UUID than " "the instance that received the FlowFile. This generally means that the attribute was " "not reset after being successfully retried by a previous instance of this processor.") .withAllowedValues({FAIL_ON_REUSE, WARN_ON_REUSE, RESET_REUSE}) .withDefaultValue(FAIL_ON_REUSE) .isRequired(true) .build(); EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ RetryAttribute, MaximumRetries, PenalizeRetries, FailOnNonNumericalOverwrite, ReuseMode }); EXTENSIONAPI static constexpr auto Retry = core::RelationshipDefinition{"retry", "Input FlowFile has not exceeded the configured maximum retry count, pass this relationship back to the input Processor to create a limited feedback loop."}; EXTENSIONAPI static constexpr auto RetriesExceeded = core::RelationshipDefinition{"retries_exceeded", "Input FlowFile has exceeded the configured maximum retry count, do not pass this relationship back to the input Processor to terminate the limited feedback loop."}; EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "The processor is configured such that a non-numerical value on 'Retry Attribute' results in a failure instead of resetting " "that value to '1'. This will immediately terminate the limited feedback loop. Might also include when 'Maximum Retries' contains " " attribute expression language that does not resolve to an Integer."}; EXTENSIONAPI static constexpr auto Relationships = std::array{ Retry, RetriesExceeded, Failure }; EXTENSIONAPI static constexpr auto RetryOutputAttribute = core::OutputAttributeDefinition<0>{"Retry Attribute", {}, "User defined retry attribute is updated with the current retry count"}; EXTENSIONAPI static constexpr auto RetryWithUuidOutputAttribute = core::OutputAttributeDefinition<0>{"Retry Attribute .uuid", {}, "User defined retry attribute with .uuid suffix is updated with the UUID of the processor that retried the FlowFile last"}; EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 2>{ RetryOutputAttribute, RetryWithUuidOutputAttribute }; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true; EXTENSIONAPI static constexpr auto RetriesExceededAttribute = core::DynamicProperty{"Exceeded FlowFile Attribute Key", "The value of the attribute added to the FlowFile", "One or more dynamic properties can be used to add attributes to FlowFiles passed to the 'retries_exceeded' relationship.", true}; EXTENSIONAPI static constexpr auto DynamicProperties = std::array{RetriesExceededAttribute}; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; EXTENSIONAPI static constexpr bool IsSingleThreaded = false; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void initialize() override; private: void readDynamicPropertyKeys(const core::ProcessContext& context); std::optional<uint64_t> getRetryPropertyValue(const std::shared_ptr<core::FlowFile>& flow_file) const; void setRetriesExceededAttributesOnFlowFile(const core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) const; std::string retry_attribute_; uint64_t maximum_retries_ = 3; // The real default value is set by the default on the MaximumRetries property bool penalize_retries_ = true; // The real default value is set by the default on the PenalizeRetries property bool fail_on_non_numerical_overwrite_ = false; // The real default value is set by the default on the FailOnNonNumericalOverwrite property std::string reuse_mode_; std::vector<core::Property> exceeded_flowfile_attribute_keys_; }; } // namespace org::apache::nifi::minifi::processors