libminifi/include/core/Processor.h (181 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <utils/Id.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_set>
#include <unordered_map>
#include <utility>
#include <vector>
#include "ConfigurableComponent.h"
#include "Connectable.h"
#include "Core.h"
#include "core/Annotation.h"
#include "DynamicProperty.h"
#include "Scheduling.h"
#include "utils/TimeUtil.h"
#include "core/state/nodes/MetricsBase.h"
#include "ProcessorMetrics.h"
#include "utils/gsl.h"
#include "OutputAttributeDefinition.h"
#define ADD_GET_PROCESSOR_NAME \
std::string getProcessorType() const override { \
auto class_name = org::apache::nifi::minifi::core::className<decltype(*this)>(); \
auto splitted = org::apache::nifi::minifi::utils::StringUtils::split(class_name, "::"); \
return splitted[splitted.size() - 1]; \
}
#define ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS \
bool supportsDynamicProperties() const override { return SupportsDynamicProperties; } \
bool supportsDynamicRelationships() const override { return SupportsDynamicRelationships; } \
minifi::core::annotation::Input getInputRequirement() const override { return InputRequirement; } \
bool isSingleThreaded() const override { return IsSingleThreaded; } \
ADD_GET_PROCESSOR_NAME
namespace org::apache::nifi::minifi {
class Connection;
namespace io {
class StreamFactory;
}
namespace core {
class ProcessContext;
class ProcessSession;
class ProcessSessionFactory;
constexpr std::chrono::microseconds MINIMUM_SCHEDULING_PERIOD{30};
#define BUILDING_DLL 1
class Processor : public Connectable, public ConfigurableComponent, public state::response::ResponseNodeSource {
public:
Processor(std::string name, const utils::Identifier& uuid, std::shared_ptr<ProcessorMetrics> metrics = nullptr);
explicit Processor(std::string name, std::shared_ptr<ProcessorMetrics> metrics = nullptr);
Processor(const Processor& parent) = delete;
Processor& operator=(const Processor& parent) = delete;
bool isRunning() const override;
~Processor() override;
void setScheduledState(ScheduledState state);
ScheduledState getScheduledState() const {
return state_;
}
void setSchedulingStrategy(SchedulingStrategy strategy) {
strategy_ = strategy;
}
SchedulingStrategy getSchedulingStrategy() const {
return strategy_;
}
void setSchedulingPeriod(std::chrono::steady_clock::duration period) {
scheduling_period_ = std::max(std::chrono::steady_clock::duration(MINIMUM_SCHEDULING_PERIOD), period);
}
std::chrono::steady_clock::duration getSchedulingPeriod() const {
return scheduling_period_;
}
void setCronPeriod(const std::string &period) {
cron_period_ = period;
}
std::string getCronPeriod() const {
return cron_period_;
}
void setRunDurationNano(std::chrono::steady_clock::duration period) {
run_duration_ = period;
}
std::chrono::steady_clock::duration getRunDurationNano() const {
return (run_duration_);
}
void setYieldPeriodMsec(std::chrono::milliseconds period) {
yield_period_ = period;
}
std::chrono::steady_clock::duration getYieldPeriod() const {
return yield_period_;
}
void setPenalizationPeriod(std::chrono::milliseconds period) {
penalization_period_ = period;
}
void setMaxConcurrentTasks(uint8_t tasks) override;
virtual bool isSingleThreaded() const = 0;
virtual std::string getProcessorType() const = 0;
void setTriggerWhenEmpty(bool value) {
_triggerWhenEmpty = value;
}
bool getTriggerWhenEmpty() const {
return (_triggerWhenEmpty);
}
uint8_t getActiveTasks() const {
return (active_tasks_);
}
void incrementActiveTasks() {
active_tasks_++;
}
void decrementActiveTask() {
if (active_tasks_ > 0)
active_tasks_--;
}
void clearActiveTask() {
active_tasks_ = 0;
}
void yield() override;
void yield(std::chrono::steady_clock::duration delta_time);
virtual bool isYield();
void clearYield();
std::chrono::steady_clock::duration getYieldTime() const;
// Whether flow file queue full in any of the outgoing connection
bool flowFilesOutGoingFull() const;
bool addConnection(Connectable* connection);
virtual void onTrigger(const std::shared_ptr<ProcessContext> &context, const std::shared_ptr<ProcessSessionFactory> &sessionFactory);
void onTrigger(ProcessContext *context, ProcessSessionFactory *sessionFactory);
bool canEdit() override {
return !isRunning();
}
virtual void onTrigger(const std::shared_ptr<ProcessContext> &context, const std::shared_ptr<ProcessSession> &session) {
onTrigger(context.get(), session.get());
}
virtual void onTrigger(ProcessContext* /*context*/, ProcessSession* /*session*/) {
}
void initialize() override {
}
virtual void onSchedule(const std::shared_ptr<ProcessContext> &context, const std::shared_ptr<ProcessSessionFactory> &sessionFactory) {
onSchedule(context.get(), sessionFactory.get());
}
virtual void onSchedule(ProcessContext* /*context*/, ProcessSessionFactory* /*sessionFactory*/) {
}
// Hook executed when onSchedule fails (throws). Configuration should be reset in this
virtual void onUnSchedule() {
notifyStop();
}
// Check all incoming connections for work
bool isWorkAvailable() override;
void setStreamFactory(std::shared_ptr<minifi::io::StreamFactory> stream_factory) {
stream_factory_ = std::move(stream_factory);
}
bool isThrottledByBackpressure() const;
Connectable* pickIncomingConnection() override;
void validateAnnotations() const;
virtual annotation::Input getInputRequirement() const = 0;
state::response::SharedResponseNode getResponseNode() override {
return metrics_;
}
static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{};
static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{};
protected:
virtual void notifyStop() {
}
std::shared_ptr<minifi::io::StreamFactory> stream_factory_;
std::atomic<ScheduledState> state_;
std::atomic<std::chrono::steady_clock::duration> scheduling_period_;
std::atomic<std::chrono::steady_clock::duration> run_duration_;
std::atomic<std::chrono::steady_clock::duration> yield_period_;
std::atomic<uint8_t> active_tasks_;
std::atomic<bool> _triggerWhenEmpty;
std::string cron_period_;
gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_;
private:
mutable std::mutex mutex_;
std::atomic<std::chrono::steady_clock::time_point> yield_expiration_{};
static std::mutex& getGraphMutex() {
static std::mutex mutex{};
return mutex;
}
// must hold the graphMutex
void updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force = false);
static bool partOfCycle(Connection* conn);
// an outgoing connection allows us to reach these nodes
std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_;
std::shared_ptr<logging::Logger> logger_;
};
} // namespace core
} // namespace org::apache::nifi::minifi