extensions/python/PythonScriptEngine.cpp (138 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <string> #include <filesystem> #include "PythonScriptEngine.h" #include "PythonBindings.h" #include "types/PyProcessSession.h" #include "types/PyProcessContext.h" #include "types/PyProcessor.h" #include "types/PyLogger.h" #include "types/PyRelationship.h" namespace org::apache::nifi::minifi::extensions::python { Interpreter* Interpreter::getInterpreter() { static Interpreter interpreter; return &interpreter; } GlobalInterpreterLock::GlobalInterpreterLock() { gil_state_ = PyGILState_Ensure(); } GlobalInterpreterLock::~GlobalInterpreterLock() { PyGILState_Release(gil_state_); } namespace { // PyEval_InitThreads might be marked deprecated (depending on the version of Python.h) // Python <= 3.6: This needs to be called manually after Py_Initialize to initialize threads // Python >= 3.7: Noop function since its functionality is included in Py_Initialize // Python >= 3.9: Marked as deprecated (still noop) // This can be removed if we drop the support for Python 3.6 void initThreads() { #if defined(__clang__) #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wdeprecated-declarations" #elif defined(__GNUC__) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdeprecated-declarations" #elif defined(WIN32) #pragma warning(push) #pragma warning(disable: 4996) #endif if (!PyEval_ThreadsInitialized()) PyEval_InitThreads(); #if defined(__clang__) #pragma clang diagnostic pop #elif defined(__GNUC__) #pragma GCC diagnostic pop #elif defined(WIN32) #pragma warning(pop) #endif } } // namespace Interpreter::Interpreter() { Py_Initialize(); initThreads(); PyInit_minifi_native(); saved_thread_state_ = PyEval_SaveThread(); } Interpreter::~Interpreter() { PyEval_RestoreThread(saved_thread_state_); Py_Finalize(); } PythonScriptEngine::PythonScriptEngine() { Interpreter::getInterpreter(); GlobalInterpreterLock lock; bindings_ = OwnedDict::create(); } PythonScriptEngine::~PythonScriptEngine() { GlobalInterpreterLock lock; bindings_.resetReference(); } void PythonScriptEngine::eval(const std::string& script) { GlobalInterpreterLock gil; try { evaluateModuleImports(); evalInternal(script); } catch (const std::exception& e) { throw PythonScriptException(e.what()); } } void PythonScriptEngine::evalFile(const std::filesystem::path& file_name) { GlobalInterpreterLock gil; try { evaluateModuleImports(); std::ifstream file(file_name, std::ios::in); if (!file.is_open()) { throw PythonScriptException(fmt::format("Couldn't open {}", file_name.string())); } std::string content{std::istreambuf_iterator<char>(file), std::istreambuf_iterator<char>()}; auto compiled_string = OwnedObject(Py_CompileString(content.c_str(), file_name.string().c_str(), Py_file_input)); if (!compiled_string.get()) { throw PyException(); } const auto result = OwnedObject(PyEval_EvalCode(compiled_string.get(), bindings_.get(), bindings_.get())); if (!result.get()) { throw PyException(); } } catch (const std::exception &e) { throw PythonScriptException(e.what()); } } void PythonScriptEngine::onInitialize(core::Processor* proc) { auto newproc = std::make_shared<python::PythonProcessor>(proc); call("onInitialize", std::weak_ptr(newproc)); } void PythonScriptEngine::describe(core::Processor* proc) { auto newproc = std::make_shared<python::PythonProcessor>(proc); callRequiredFunction("describe", std::weak_ptr(newproc)); } void PythonScriptEngine::onSchedule(const std::shared_ptr<core::ProcessContext> &context) { call("onSchedule", std::weak_ptr(context)); } void PythonScriptEngine::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { auto py_session = std::make_shared<python::PyProcessSession>(session); call("onTrigger", std::weak_ptr(context), std::weak_ptr(py_session)); } void PythonScriptEngine::initialize(const core::Relationship& success, const core::Relationship& failure, const std::shared_ptr<core::logging::Logger>& logger) { bind("log", std::weak_ptr<core::logging::Logger>(logger)); bind("REL_SUCCESS", success); bind("REL_FAILURE", failure); } void PythonScriptEngine::evalInternal(std::string_view script) { const auto script_file = "# -*- coding: utf-8 -*-\n" + std::string(script); auto compiled_string = OwnedObject(Py_CompileString(script_file.c_str(), "<string>", Py_file_input)); if (!compiled_string.get()) { throw PyException(); } const auto result = OwnedObject(PyEval_EvalCode(compiled_string.get(), bindings_.get(), bindings_.get())); if (!result.get()) { throw PyException(); } } void PythonScriptEngine::evaluateModuleImports() { bindings_.put("__builtins__", OwnedObject(PyImport_ImportModule("builtins"))); if (module_paths_.empty()) { return; } evalInternal("import sys"); for (const auto& module_path : module_paths_) { if (std::filesystem::is_regular_file(module_path)) { evalInternal("sys.path.append(r'" + module_path.parent_path().string() + "')"); } else { evalInternal("sys.path.append(r'" + module_path.string() + "')"); } } } } // namespace org::apache::nifi::minifi::extensions::python