extensions/python/types/PyProcessSession.cpp (318 lines of code) (raw):
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PyProcessSession.h"
#include <utility>
#include "PyScriptFlowFile.h"
#include "PyRelationship.h"
#include "types/PyOutputStream.h"
#include "types/PyInputStream.h"
#include "range/v3/algorithm/remove_if.hpp"
#include "utils/gsl.h"
namespace org::apache::nifi::minifi::extensions::python {
namespace core = org::apache::nifi::minifi::core;
PyProcessSession::PyProcessSession(core::ProcessSession& session)
: session_(session) {
}
std::shared_ptr<core::FlowFile> PyProcessSession::get() {
auto flow_file = session_.get();
if (flow_file == nullptr) {
return nullptr;
}
flow_files_.push_back(flow_file);
return flow_file;
}
void PyProcessSession::transfer(const std::shared_ptr<core::FlowFile>& flow_file,
const core::Relationship& relationship) {
if (!flow_file) {
throw std::runtime_error("Access of FlowFile after it has been released");
}
session_.transfer(flow_file, relationship);
}
void PyProcessSession::transferToCustomRelationship(const std::shared_ptr<core::FlowFile>& flow_file, const std::string& relationship_name) {
if (!flow_file) {
throw std::runtime_error("Access of FlowFile after it has been released");
}
session_.transferToCustomRelationship(flow_file, relationship_name);
}
void PyProcessSession::read(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject input_stream_callback) {
if (!flow_file) {
throw std::runtime_error("Access of FlowFile after it has been released");
}
session_.read(flow_file, [&input_stream_callback](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
return Long(Callable(input_stream_callback.getAttribute("process"))(std::weak_ptr(input_stream))).asInt64();
});
}
void PyProcessSession::write(const std::shared_ptr<core::FlowFile>& flow_file, BorrowedObject output_stream_callback) {
if (!flow_file) {
throw std::runtime_error("Access of FlowFile after it has been released");
}
session_.write(flow_file, [&output_stream_callback](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
return Long(Callable(output_stream_callback.getAttribute("process"))(std::weak_ptr(output_stream))).asInt64();
});
}
std::shared_ptr<core::FlowFile> PyProcessSession::create(const std::shared_ptr<core::FlowFile>& flow_file) {
auto result = session_.create(flow_file.get());
flow_files_.push_back(result);
return result;
}
std::shared_ptr<core::FlowFile> PyProcessSession::clone(const std::shared_ptr<core::FlowFile>& flow_file) {
if (!flow_file) {
throw std::runtime_error("Flow file to clone is nullptr");
}
auto result = session_.clone(*flow_file);
flow_files_.push_back(result);
return result;
}
void PyProcessSession::remove(const std::shared_ptr<core::FlowFile>& flow_file) {
session_.remove(flow_file);
flow_files_.erase(ranges::remove_if(flow_files_, [&flow_file](const auto& ff)-> bool { return ff == flow_file; }), flow_files_.end());
}
std::string PyProcessSession::getContentsAsString(const std::shared_ptr<core::FlowFile>& flow_file) {
if (!flow_file) {
throw std::runtime_error("Access of FlowFile after it has been released");
}
std::string content;
session_.read(flow_file, [&content](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
content.resize(input_stream->size());
return gsl::narrow<int64_t>(input_stream->read(as_writable_bytes(std::span(content))));
});
return content;
}
void PyProcessSession::putAttribute(const std::shared_ptr<core::FlowFile>& flow_file, std::string_view key, const std::string& value) {
session_.putAttribute(*flow_file, key, value);
}
extern "C" {
static PyMethodDef PyProcessSessionObject_methods[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays)
{"get", (PyCFunction) PyProcessSessionObject::get, METH_NOARGS, nullptr},
{"create", (PyCFunction) PyProcessSessionObject::create, METH_VARARGS, nullptr},
{"clone", (PyCFunction) PyProcessSessionObject::clone, METH_VARARGS, nullptr},
{"read", (PyCFunction) PyProcessSessionObject::read, METH_VARARGS, nullptr},
{"write", (PyCFunction) PyProcessSessionObject::write, METH_VARARGS, nullptr},
{"transfer", (PyCFunction) PyProcessSessionObject::transfer, METH_VARARGS, nullptr},
{"transferToCustomRelationship", (PyCFunction) PyProcessSessionObject::transferToCustomRelationship, METH_VARARGS, nullptr},
{"remove", (PyCFunction) PyProcessSessionObject::remove, METH_VARARGS, nullptr},
{"getContentsAsBytes", (PyCFunction) PyProcessSessionObject::getContentsAsBytes, METH_VARARGS, nullptr},
{"putAttribute", (PyCFunction) PyProcessSessionObject::putAttribute, METH_VARARGS, nullptr},
{} /* Sentinel */
};
static PyType_Slot PyProcessTypeSpecSlots[] = { // NOLINT(cppcoreguidelines-avoid-c-arrays)
{Py_tp_dealloc, reinterpret_cast<void*>(pythonAllocatedInstanceDealloc<PyProcessSessionObject>)},
{Py_tp_init, reinterpret_cast<void*>(PyProcessSessionObject::init)},
{Py_tp_methods, reinterpret_cast<void*>(PyProcessSessionObject_methods)},
{Py_tp_new, reinterpret_cast<void*>(newPythonAllocatedInstance<PyProcessSessionObject>)},
{} /* Sentinel */
};
static PyType_Spec PyProcessSessionObjectTypeSpec{
.name = "minifi_native.ProcessSession",
.basicsize = sizeof(PyProcessSessionObject),
.itemsize = 0,
.flags = Py_TPFLAGS_DEFAULT,
.slots = PyProcessTypeSpecSlots
};
int PyProcessSessionObject::init(PyProcessSessionObject* self, PyObject* args, PyObject*) {
PyObject* weak_ptr_capsule = nullptr;
if (!PyArg_ParseTuple(args, "O", &weak_ptr_capsule)) {
return -1;
}
auto process_session = PyCapsule_GetPointer(weak_ptr_capsule, HeldTypeName);
if (!process_session)
return -1;
self->process_session_ = *static_cast<std::weak_ptr<PyProcessSession>*>(process_session);
return 0;
}
PyObject* PyProcessSessionObject::get(PyProcessSessionObject* self, PyObject*) {
auto session = self->process_session_.lock();
if (!session) {
PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'");
return nullptr;
}
if (auto flow_file = session->get())
return object::returnReference(std::weak_ptr(flow_file));
return object::returnReference(nullptr);
}
PyObject* PyProcessSessionObject::create(PyProcessSessionObject* self, PyObject* args) {
auto session = self->process_session_.lock();
if (!session) {
PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'");
return nullptr;
}
std::shared_ptr<core::FlowFile> parent_flow_file;
auto arg_size = PyTuple_Size(args);
if (arg_size > 0) {
PyObject* script_flow_file = nullptr;
if (!PyArg_ParseTuple(args, "O!", PyScriptFlowFile::typeObject(), &script_flow_file)) {
return nullptr;
}
parent_flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock();
}
if (auto flow_file = session->create(parent_flow_file))
return object::returnReference(std::weak_ptr(flow_file));
return object::returnReference(nullptr);
}
PyObject* PyProcessSessionObject::clone(PyProcessSessionObject* self, PyObject* args) {
auto session = self->process_session_.lock();
if (!session) {
PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'");
return nullptr;
}
PyObject* script_flow_file = nullptr;
if (!PyArg_ParseTuple(args, "O!", PyScriptFlowFile::typeObject(), &script_flow_file)) {
return nullptr;
}
const auto flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock();
if (auto cloned_flow_file = session->clone(flow_file))
return object::returnReference(std::weak_ptr(cloned_flow_file));
return object::returnReference(nullptr);
}
PyObject* PyProcessSessionObject::remove(PyProcessSessionObject* self, PyObject* args) {
auto session = self->process_session_.lock();
if (!session) {
PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'");
return nullptr;
}
PyObject* script_flow_file = nullptr;
if (!PyArg_ParseTuple(args, "O!", PyScriptFlowFile::typeObject(), &script_flow_file)) {
return nullptr;
}
const auto flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock();
session->remove(flow_file);
Py_RETURN_NONE;
}
PyObject* PyProcessSessionObject::read(PyProcessSessionObject* self, PyObject* args) {
auto session = self->process_session_.lock();
if (!session) {
PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'");
return nullptr;
}
PyObject* script_flow_file = nullptr;
PyObject* callback = nullptr;
if (!PyArg_ParseTuple(args, "O!O", PyScriptFlowFile::typeObject(), &script_flow_file, &callback)) {
return nullptr;
}
const auto flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock();
if (!flow_file) {
PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 'on_trigger'");
return nullptr;
}
session->read(flow_file, BorrowedObject(callback));
Py_RETURN_NONE;
}
PyObject* PyProcessSessionObject::write(PyProcessSessionObject* self, PyObject* args) {
auto session = self->process_session_.lock();
if (!session) {
PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'");
return nullptr;
}
PyObject* script_flow_file = nullptr;
PyObject* callback = nullptr;
if (!PyArg_ParseTuple(args, "O!O", PyScriptFlowFile::typeObject(), &script_flow_file, &callback)) {
return nullptr;
}
const auto flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock();
if (!flow_file) {
PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 'on_trigger'");
return nullptr;
}
session->write(flow_file, BorrowedObject(callback));
Py_RETURN_NONE;
}
PyObject* PyProcessSessionObject::transfer(PyProcessSessionObject* self, PyObject* args) {
auto session = self->process_session_.lock();
if (!session) {
PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'");
return nullptr;
}
PyObject* script_flow_file = nullptr;
PyObject* relationship = nullptr;
if (!PyArg_ParseTuple(args, "O!O!", PyScriptFlowFile::typeObject(), &script_flow_file, PyRelationship::typeObject(), &relationship)) {
return nullptr;
}
const auto flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock();
if (!flow_file) {
PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 'on_trigger'");
return nullptr;
}
session->transfer(flow_file, reinterpret_cast<PyRelationship*>(relationship)->relationship_);
Py_RETURN_NONE;
}
PyObject* PyProcessSessionObject::transferToCustomRelationship(PyProcessSessionObject* self, PyObject* args) {
auto session = self->process_session_.lock();
if (!session) {
PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'");
return nullptr;
}
PyObject* script_flow_file = nullptr;
const char* relationship_name = nullptr;
if (!PyArg_ParseTuple(args, "O!s", PyScriptFlowFile::typeObject(), &script_flow_file, &relationship_name)) {
return nullptr;
}
if (!relationship_name) {
PyErr_SetString(PyExc_AttributeError, "Custom relationship name is invalid!");
return nullptr;
}
std::string relationship_name_str(relationship_name);
if (relationship_name_str.empty()) {
PyErr_SetString(PyExc_AttributeError, "Custom relationship name is empty!");
return nullptr;
}
const auto flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock();
if (!flow_file) {
PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 'on_trigger'");
return nullptr;
}
BorrowedStr name = BorrowedStr::fromTuple(args, 0);
session->transferToCustomRelationship(flow_file, relationship_name_str);
Py_RETURN_NONE;
}
PyObject* PyProcessSessionObject::getContentsAsBytes(PyProcessSessionObject* self, PyObject* args) {
auto session = self->process_session_.lock();
if (!session) {
PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'");
return nullptr;
}
PyObject* script_flow_file = nullptr;
if (!PyArg_ParseTuple(args, "O!", PyScriptFlowFile::typeObject(), &script_flow_file)) {
return nullptr;
}
const auto flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock();
auto content = session->getContentsAsString(flow_file);
return PyBytes_FromStringAndSize(content.c_str(), gsl::narrow<Py_ssize_t>(content.size()));
}
PyObject* PyProcessSessionObject::putAttribute(PyProcessSessionObject* self, PyObject* args) {
auto session = self->process_session_.lock();
if (!session) {
PyErr_SetString(PyExc_AttributeError, "tried reading process session outside 'on_trigger'");
return nullptr;
}
PyObject* script_flow_file = nullptr;
const char* attribute_key = nullptr;
const char* attribute_value = nullptr;
if (!PyArg_ParseTuple(args, "O!ss", PyScriptFlowFile::typeObject(), &script_flow_file, &attribute_key, &attribute_value)) {
return nullptr;
}
const auto flow_file = reinterpret_cast<PyScriptFlowFile*>(script_flow_file)->script_flow_file_.lock();
if (!flow_file) {
PyErr_SetString(PyExc_AttributeError, "tried reading FlowFile outside 'on_trigger'");
return nullptr;
}
if (!attribute_key) {
PyErr_SetString(PyExc_AttributeError, "Attribute key is invalid!");
return nullptr;
}
std::string attribute_key_str(attribute_key);
if (attribute_key_str.empty()) {
PyErr_SetString(PyExc_AttributeError, "Attribute key is empty!");
return nullptr;
}
if (!attribute_value) {
PyErr_SetString(PyExc_AttributeError, "Attribute value is invalid!");
return nullptr;
}
std::string attribute_value_str(attribute_value);
session->putAttribute(flow_file, attribute_key_str, attribute_value_str);
Py_RETURN_NONE;
}
PyTypeObject* PyProcessSessionObject::typeObject() {
static OwnedObject PyProcessSessionObjectType{PyType_FromSpec(&PyProcessSessionObjectTypeSpec)};
return reinterpret_cast<PyTypeObject*>(PyProcessSessionObjectType.get());
}
} // extern "C"
} // namespace org::apache::nifi::minifi::extensions::python