utils/include/core/Processor.h (182 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <string_view>
#include <unordered_set>
#include <unordered_map>
#include <utility>
#include <vector>
#include "core/ConfigurableComponentImpl.h"
#include "core/Connectable.h"
#include "core/Property.h"
#include "core/Core.h"
#include "minifi-cpp/core/Annotation.h"
#include "minifi-cpp/core/DynamicProperty.h"
#include "minifi-cpp/core/Scheduling.h"
#include "minifi-cpp/core/state/nodes/MetricsBase.h"
#include "minifi-cpp/core/ProcessorMetrics.h"
#include "utils/gsl.h"
#include "utils/Id.h"
#include "minifi-cpp/core/OutputAttributeDefinition.h"
#include "minifi-cpp/core/Processor.h"
#define ADD_GET_PROCESSOR_NAME \
std::string getProcessorType() const override { \
auto class_name = org::apache::nifi::minifi::core::className<decltype(*this)>(); \
auto splitted = org::apache::nifi::minifi::utils::string::split(class_name, "::"); \
return splitted[splitted.size() - 1]; \
}
#define ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS \
bool supportsDynamicProperties() const override { return SupportsDynamicProperties; } \
bool supportsDynamicRelationships() const override { return SupportsDynamicRelationships; } \
minifi::core::annotation::Input getInputRequirement() const override { return InputRequirement; } \
bool isSingleThreaded() const override { return IsSingleThreaded; } \
ADD_GET_PROCESSOR_NAME
namespace org::apache::nifi::minifi {
class Connection;
namespace core {
class ProcessContext;
class ProcessSession;
class ProcessSessionFactory;
constexpr std::chrono::microseconds MINIMUM_SCHEDULING_PERIOD{30};
#define BUILDING_DLL 1
class ProcessorImpl : public virtual Processor, public ConnectableImpl, public ConfigurableComponentImpl {
public:
ProcessorImpl(std::string_view name, const utils::Identifier& uuid, std::shared_ptr<ProcessorMetrics> metrics = nullptr);
explicit ProcessorImpl(std::string_view name, std::shared_ptr<ProcessorMetrics> metrics = nullptr);
ProcessorImpl(const ProcessorImpl& parent) = delete;
ProcessorImpl& operator=(const ProcessorImpl& parent) = delete;
bool isRunning() const override;
~ProcessorImpl() override;
void setScheduledState(ScheduledState state) override;
ScheduledState getScheduledState() const override {
return state_;
}
void setSchedulingStrategy(SchedulingStrategy strategy) override {
strategy_ = strategy;
}
SchedulingStrategy getSchedulingStrategy() const override {
return strategy_;
}
void setSchedulingPeriod(std::chrono::steady_clock::duration period) override {
scheduling_period_ = std::max(std::chrono::steady_clock::duration(MINIMUM_SCHEDULING_PERIOD), period);
}
std::chrono::steady_clock::duration getSchedulingPeriod() const override {
return scheduling_period_;
}
void setCronPeriod(const std::string &period) override {
cron_period_ = period;
}
std::string getCronPeriod() const override {
return cron_period_;
}
void setRunDurationNano(std::chrono::steady_clock::duration period) override {
run_duration_ = period;
}
std::chrono::steady_clock::duration getRunDurationNano() const override {
return (run_duration_);
}
void setYieldPeriodMsec(std::chrono::milliseconds period) override {
yield_period_ = period;
}
std::chrono::steady_clock::duration getYieldPeriod() const override {
return yield_period_;
}
void setPenalizationPeriod(std::chrono::milliseconds period) override {
penalization_period_ = period;
}
void setMaxConcurrentTasks(uint8_t tasks) override;
bool isSingleThreaded() const override = 0;
std::string getProcessorType() const override = 0;
void setTriggerWhenEmpty(bool value) override {
_triggerWhenEmpty = value;
}
bool getTriggerWhenEmpty() const override {
return (_triggerWhenEmpty);
}
uint8_t getActiveTasks() const override {
return (active_tasks_);
}
void incrementActiveTasks() override {
++active_tasks_;
}
void decrementActiveTask() override {
if (active_tasks_ > 0)
--active_tasks_;
}
void clearActiveTask() override {
active_tasks_ = 0;
}
std::string getProcessGroupUUIDStr() const override {
return process_group_uuid_;
}
void setProcessGroupUUIDStr(const std::string &uuid) override {
process_group_uuid_ = uuid;
}
void yield() override;
void yield(std::chrono::steady_clock::duration delta_time) override;
bool isYield() override;
void clearYield() override;
std::chrono::steady_clock::time_point getYieldExpirationTime() const override { return yield_expiration_; }
std::chrono::steady_clock::duration getYieldTime() const override;
// Whether flow file queue full in any of the outgoing connection
bool flowFilesOutGoingFull() const override;
bool addConnection(Connectable* connection) override;
bool canEdit() override {
return !isRunning();
}
void initialize() override {
}
void triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) override;
void trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) override;
void onTrigger(ProcessContext&, ProcessSession&) override {}
void onSchedule(ProcessContext&, ProcessSessionFactory&) override {}
// Hook executed when onSchedule fails (throws). Configuration should be reset in this
void onUnSchedule() override {
notifyStop();
}
// Check all incoming connections for work
bool isWorkAvailable() override;
bool isThrottledByBackpressure() const override;
Connectable* pickIncomingConnection() override;
void validateAnnotations() const override;
annotation::Input getInputRequirement() const override = 0;
state::response::SharedResponseNode getResponseNode() override {
return metrics_;
}
gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const override {
return metrics_;
}
static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{};
static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{};
protected:
virtual void notifyStop() {
}
std::atomic<ScheduledState> state_;
std::atomic<std::chrono::steady_clock::duration> scheduling_period_;
std::atomic<std::chrono::steady_clock::duration> run_duration_;
std::atomic<std::chrono::steady_clock::duration> yield_period_;
std::atomic<uint8_t> active_tasks_;
std::atomic<bool> _triggerWhenEmpty;
std::string cron_period_;
gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_;
std::shared_ptr<logging::Logger> logger_;
private:
mutable std::mutex mutex_;
std::atomic<std::chrono::steady_clock::time_point> yield_expiration_{};
static std::mutex& getGraphMutex() {
static std::mutex mutex{};
return mutex;
}
// must hold the graphMutex
void updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force = false) override;
const std::unordered_map<Connection*, std::unordered_set<Processor*>>& reachable_processors() const override {
return reachable_processors_;
}
static bool partOfCycle(Connection* conn);
// an outgoing connection allows us to reach these nodes
std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_;
std::string process_group_uuid_;
};
} // namespace core
} // namespace org::apache::nifi::minifi