libminifi/include/core/FlowConfiguration.h (100 lines of code) (raw):
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
#include <filesystem>
#include "core/Core.h"
#include "Connection.h"
#include "RemoteProcessorGroupPort.h"
#include "core/controller/ControllerServiceNode.h"
#include "core/controller/StandardControllerServiceProvider.h"
#include "provenance/Provenance.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
#include "core/Processor.h"
#include "core/logging/LoggerFactory.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessGroup.h"
#include "core/state/nodes/FlowInformation.h"
#include "utils/file/FileSystem.h"
#include "utils/ChecksumCalculator.h"
#include "ParameterContext.h"
#include "ParameterProvider.h"
namespace org::apache::nifi::minifi::core {
class static_initializers {
public:
std::vector<std::string> statics_sl_funcs_;
std::mutex atomic_initialization_;
};
extern static_initializers &get_static_functions();
struct ConfigurationContext {
std::shared_ptr<core::Repository> flow_file_repo;
std::shared_ptr<core::ContentRepository> content_repo;
std::shared_ptr<Configure> configuration;
std::optional<std::filesystem::path> path{std::nullopt};
std::shared_ptr<utils::file::FileSystem> filesystem{std::make_shared<utils::file::FileSystem>()};
std::optional<utils::crypto::EncryptionProvider> sensitive_values_encryptor{std::nullopt};
};
enum class FlowSerializationType { Json, NifiJson, Yaml };
/**
* Purpose: Flow configuration defines the mechanism
* by which we will configure our flow controller
*/
class FlowConfiguration : public CoreComponentImpl {
public:
/**
* Constructor that will be used for configuring
* the flow controller.
*/
explicit FlowConfiguration(ConfigurationContext ctx);
~FlowConfiguration() override;
// Create Processor (Node/Input/Output Port) based on the name
std::unique_ptr<core::Processor> createProcessor(const std::string &name, const utils::Identifier &uuid);
std::unique_ptr<core::Processor> createProcessor(const std::string &name, const std::string &fullname, const utils::Identifier &uuid);
// Create Root Processor Group
static std::unique_ptr<core::ProcessGroup> createRootProcessGroup(const std::string &name, const utils::Identifier &uuid, int version);
static std::unique_ptr<core::ProcessGroup> createSimpleProcessGroup(const std::string &name, const utils::Identifier &uuid, int version);
static std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(const std::string &name, const utils::Identifier &uuid);
std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &full_class_name, const std::string &name,
const utils::Identifier &uuid);
// Create Connection
[[nodiscard]] std::unique_ptr<minifi::Connection> createConnection(const std::string &name, const utils::Identifier &uuid) const;
// Create Provenance Report Task
std::unique_ptr<core::reporting::SiteToSiteProvenanceReportingTask> createProvenanceReportTask();
static std::unique_ptr<core::ParameterProvider> createParameterProvider(const std::string &class_name, const std::string &full_class_name, const utils::Identifier& uuid);
[[nodiscard]] std::shared_ptr<state::response::FlowVersion> getFlowVersion() const {
return flow_version_;
}
virtual std::vector<std::string> getSupportedFormats() const {
return {};
}
std::shared_ptr<Configure> getConfiguration() { // cannot be const as getters mutate the underlying map
return configuration_;
}
bool persist(const core::ProcessGroup& process_group);
bool persist(const std::string& serialized_flow);
/**
* Returns the configuration path string
* @return config_path_
*/
const std::optional<std::filesystem::path>& getConfigurationPath() {
return config_path_;
}
virtual std::unique_ptr<core::ProcessGroup> getRoot() {
return nullptr;
}
std::unique_ptr<core::ProcessGroup> updateFromPayload(const std::string& url, const std::string& yamlConfigPayload, const std::optional<std::string>& flow_id = std::nullopt);
virtual std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string& /*yamlConfigPayload*/) {
return nullptr;
}
std::shared_ptr<core::controller::StandardControllerServiceProvider> getControllerServiceProvider() const {
return service_provider_;
}
utils::ChecksumCalculator& getChecksumCalculator() { return checksum_calculator_; }
const std::unordered_map<std::string, gsl::not_null<std::unique_ptr<ParameterContext>>>& getParameterContexts() const {
return parameter_contexts_;
}
protected:
std::unordered_map<std::string, gsl::not_null<std::unique_ptr<ParameterContext>>> parameter_contexts_;
std::vector<gsl::not_null<std::unique_ptr<ParameterProvider>>> parameter_providers_;
std::optional<std::filesystem::path> config_path_;
std::shared_ptr<core::Repository> flow_file_repo_;
std::shared_ptr<core::ContentRepository> content_repo_;
std::shared_ptr<Configure> configuration_;
std::shared_ptr<core::controller::StandardControllerServiceProvider> service_provider_;
std::shared_ptr<state::response::FlowVersion> flow_version_;
std::shared_ptr<utils::file::FileSystem> filesystem_;
utils::crypto::EncryptionProvider sensitive_values_encryptor_;
utils::ChecksumCalculator checksum_calculator_;
private:
virtual std::string serialize(const ProcessGroup&) { return ""; }
std::shared_ptr<logging::Logger> logger_;
};
} // namespace org::apache::nifi::minifi::core