extensions/python/PythonScriptEngine.cpp (138 lines of code) (raw):
/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <string>
#include <filesystem>
#include "PythonScriptEngine.h"
#include "PythonBindings.h"
#include "types/PyProcessSession.h"
#include "types/PyProcessContext.h"
#include "types/PyProcessor.h"
#include "types/PyLogger.h"
#include "types/PyRelationship.h"
namespace org::apache::nifi::minifi::extensions::python {
Interpreter* Interpreter::getInterpreter() {
  static Interpreter interpreter;
  return &interpreter;
}
GlobalInterpreterLock::GlobalInterpreterLock() {
  gil_state_ = PyGILState_Ensure();
}
GlobalInterpreterLock::~GlobalInterpreterLock() {
  PyGILState_Release(gil_state_);
}
namespace {
// PyEval_InitThreads might be marked deprecated (depending on the version of Python.h)
// Python <= 3.6: This needs to be called manually after Py_Initialize to initialize threads
// Python >= 3.7: Noop function since its functionality is included in Py_Initialize
// Python >= 3.9: Marked as deprecated (still noop)
// This can be removed if we drop the support for Python 3.6
void initThreads() {
#if defined(__clang__)
  #pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#elif defined(__GNUC__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#elif defined(WIN32)
  #pragma warning(push)
#pragma warning(disable: 4996)
#endif
  if (!PyEval_ThreadsInitialized())
    PyEval_InitThreads();
#if defined(__clang__)
#pragma clang diagnostic pop
#elif defined(__GNUC__)
#pragma GCC diagnostic pop
#elif defined(WIN32)
#pragma warning(pop)
#endif
}
}  // namespace
Interpreter::Interpreter() {
  Py_Initialize();
  initThreads();
  PyInit_minifi_native();
  saved_thread_state_ = PyEval_SaveThread();
}
Interpreter::~Interpreter() {
  PyEval_RestoreThread(saved_thread_state_);
  Py_Finalize();
}
PythonScriptEngine::PythonScriptEngine() {
  Interpreter::getInterpreter();
  GlobalInterpreterLock lock;
  bindings_ = OwnedDict::create();
}
PythonScriptEngine::~PythonScriptEngine() {
  GlobalInterpreterLock lock;
  bindings_.resetReference();
}
void PythonScriptEngine::eval(const std::string& script) {
  GlobalInterpreterLock gil;
  try {
    evaluateModuleImports();
    evalInternal(script);
  } catch (const std::exception& e) {
    throw PythonScriptException(e.what());
  }
}
void PythonScriptEngine::evalFile(const std::filesystem::path& file_name) {
  GlobalInterpreterLock gil;
  try {
    evaluateModuleImports();
    std::ifstream file(file_name, std::ios::in);
    if (!file.is_open()) {
      throw PythonScriptException(fmt::format("Couldn't open {}", file_name.string()));
    }
    std::string content{std::istreambuf_iterator<char>(file), std::istreambuf_iterator<char>()};
    auto compiled_string = OwnedObject(Py_CompileString(content.c_str(), file_name.string().c_str(), Py_file_input));
    if (!compiled_string.get()) {
      throw PyException();
    }
    const auto result = OwnedObject(PyEval_EvalCode(compiled_string.get(), bindings_.get(), bindings_.get()));
    if (!result.get()) {
      throw PyException();
    }
  } catch (const std::exception &e) {
    throw PythonScriptException(e.what());
  }
}
void PythonScriptEngine::onInitialize(core::Processor* proc) {
  auto newproc = std::make_shared<python::PythonProcessor>(proc);
  call("onInitialize", std::weak_ptr(newproc));
}
void PythonScriptEngine::describe(core::Processor* proc) {
  auto newproc = std::make_shared<python::PythonProcessor>(proc);
  callRequiredFunction("describe", std::weak_ptr(newproc));
}
void PythonScriptEngine::onSchedule(const std::shared_ptr<core::ProcessContext> &context) {
  call("onSchedule", std::weak_ptr(context));
}
void PythonScriptEngine::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
  auto py_session = std::make_shared<python::PyProcessSession>(session);
  call("onTrigger", std::weak_ptr(context), std::weak_ptr(py_session));
}
void PythonScriptEngine::initialize(const core::Relationship& success, const core::Relationship& failure, const std::shared_ptr<core::logging::Logger>& logger) {
  bind("log", std::weak_ptr<core::logging::Logger>(logger));
  bind("REL_SUCCESS", success);
  bind("REL_FAILURE", failure);
}
void PythonScriptEngine::evalInternal(std::string_view script) {
  const auto script_file = "# -*- coding: utf-8 -*-\n" + std::string(script);
  auto compiled_string = OwnedObject(Py_CompileString(script_file.c_str(), "<string>", Py_file_input));
  if (!compiled_string.get()) {
    throw PyException();
  }
  const auto result = OwnedObject(PyEval_EvalCode(compiled_string.get(), bindings_.get(), bindings_.get()));
  if (!result.get()) {
    throw PyException();
  }
}
void PythonScriptEngine::evaluateModuleImports() {
  bindings_.put("__builtins__", OwnedObject(PyImport_ImportModule("builtins")));
  if (module_paths_.empty()) {
    return;
  }
  evalInternal("import sys");
  for (const auto& module_path : module_paths_) {
    if (std::filesystem::is_regular_file(module_path)) {
      evalInternal("sys.path.append(r'" + module_path.parent_path().string() + "')");
    } else {
      evalInternal("sys.path.append(r'" + module_path.string() + "')");
    }
  }
}
}  // namespace org::apache::nifi::minifi::extensions::python