libminifi/include/core/ProcessorNode.h (140 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <memory> #include <set> #include <string> #include <utility> #include <vector> #include "ConfigurableComponent.h" #include "Connectable.h" #include "Property.h" namespace org::apache::nifi::minifi::core { /** * Processor node functions as a pass through to the implementing Connectables */ class ProcessorNode : public ConfigurableComponent, public Connectable { public: explicit ProcessorNode(Connectable* processor); ProcessorNode(const ProcessorNode &other) = delete; ProcessorNode(ProcessorNode &&other) = delete; ~ProcessorNode() override; ProcessorNode& operator=(const ProcessorNode &other) = delete; ProcessorNode& operator=(ProcessorNode &&other) = delete; /** * Get property using the provided name. * @param name property name. * @param value value passed in by reference * @return result of getting property. */ Connectable* getProcessor() const { return processor_; } void yield() override { processor_->yield(); } /** * Get property using the provided name. * @param name property name. * @param value value passed in by reference * @return result of getting property. */ template<typename T> bool getProperty(const std::string &name, T &value) { const auto processor_cast = dynamic_cast<ConfigurableComponent*>(processor_); if (nullptr != processor_cast) { return processor_cast->getProperty<T>(name, value); } else { return ConfigurableComponent::getProperty<T>(name, value); } } /** * Sets the property using the provided name * @param property name * @param value property value. * @return result of setting property. */ bool setProperty(const std::string &name, const std::string& value) { const auto processor_cast = dynamic_cast<ConfigurableComponent*>(processor_); bool ret = ConfigurableComponent::setProperty(name, value); if (nullptr != processor_cast) ret = processor_cast->setProperty(name, value); return ret; } /** * Get dynamic property using the provided name. * @param name property name. * @param value value passed in by reference * @return result of getting property. */ bool getDynamicProperty(const std::string name, std::string &value) const { const auto processor_cast = dynamic_cast<ConfigurableComponent*>(processor_); if (processor_cast) { return processor_cast->getDynamicProperty(name, value); } else { return ConfigurableComponent::getDynamicProperty(name, value); } } /** * Returns theflow version * @returns flow version. can be null if a flow version is not tracked. */ std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() const override { if (processor_ != nullptr) { return processor_->getFlowIdentifier(); } else { return connectable_version_; } } /** * Sets the dynamic property using the provided name * @param property name * @param value property value. * @return result of setting property. */ bool setDynamicProperty(const std::string& name, const std::string& value) { const auto processor_cast = dynamic_cast<ConfigurableComponent*>(processor_); auto ret = ConfigurableComponent::setDynamicProperty(name, value); if (processor_cast) { ret = processor_cast->setDynamicProperty(name, value); } return ret; } /** * Gets list of dynamic property keys * @param name property name. * @param value value passed in by reference * @return result of getting property. */ std::vector<std::string> getDynamicPropertyKeys() const { const auto processor_cast = dynamic_cast<ConfigurableComponent*>(processor_); if (processor_cast) { return processor_cast->getDynamicPropertyKeys(); } else { return ConfigurableComponent::getDynamicPropertyKeys(); } } /** * Sets the property using the provided name * @param property name * @param value property value. * @return whether property was set or not */ bool setProperty(const Property &prop, const std::string& value) { const auto processor_cast = dynamic_cast<ConfigurableComponent*>(processor_); bool ret = ConfigurableComponent::setProperty(prop, value); if (nullptr != processor_cast) ret = processor_cast->setProperty(prop, value); return ret; } void setAutoTerminatedRelationships(std::vector<Relationship> relationships) { processor_->setAutoTerminatedRelationships(relationships); } bool isAutoTerminated(const Relationship& relationship) { return processor_->isAutoTerminated(relationship); } bool isSupportedRelationship(const Relationship& relationship) { return processor_->isSupportedRelationship(relationship); } /** * Set name. * @param name */ void setName(std::string name) override { Connectable::setName(name); processor_->setName(std::move(name)); } /** * Set UUID in this instance * @param uuid uuid to apply to the internal representation. */ void setUUID(const utils::Identifier& uuid) override { Connectable::setUUID(uuid); processor_->setUUID(uuid); } std::chrono::milliseconds getPenalizationPeriod() { return processor_->getPenalizationPeriod(); } /** * Get outgoing connection based on relationship * @return set of outgoing connections. */ std::set<Connectable*> getOutGoingConnections(const std::string& relationship) override { return processor_->getOutGoingConnections(relationship); } /** * Get next incoming connection * @return next incoming connection */ Connectable* getNextIncomingConnection() { return processor_->getNextIncomingConnection(); } Connectable* pickIncomingConnection() override { return processor_->pickIncomingConnection(); } /** * @return true if incoming connections > 0 */ bool hasIncomingConnections() { return processor_->hasIncomingConnections(); } /** * Returns the UUID through the provided object. * @param uuid uuid struct to which we will copy the memory * @return success of request */ utils::Identifier getUUID() const { return processor_->getUUID(); } /** * Return the UUID string * @return the UUID str */ utils::SmallString<36> getUUIDStr() const { return processor_->getUUIDStr(); } // Get Process Name std::string getName() const override { return processor_->getName(); } uint8_t getMaxConcurrentTasks() { return processor_->getMaxConcurrentTasks(); } void setMaxConcurrentTasks(uint8_t tasks) override { processor_->setMaxConcurrentTasks(tasks); } bool supportsDynamicProperties() const override { return false; } bool supportsDynamicRelationships() const override { return false; } bool isRunning() const override; bool isWorkAvailable() override; protected: bool canEdit() override { return !processor_->isRunning(); } /** * internal connectable. */ Connectable* processor_; }; } // namespace org::apache::nifi::minifi::core