extensions/jni/jvm/JniProcessSession.cpp (301 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "JniProcessSession.h" #include <memory> #include <string> #include <algorithm> #include <set> #include <utility> #include "utils/StringUtils.h" #include "utils/file/FileUtils.h" #include "JVMLoader.h" #include "JniReferenceObjects.h" #include "JniFlowFile.h" #include "../JavaException.h" #ifdef __cplusplus extern "C" { #endif namespace minifi = org::apache::nifi::minifi; JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_create(JNIEnv *env, jobject obj) { if (obj == nullptr) { return nullptr; } minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); auto ff = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniFlowFile", env); auto ff_instance = ff.newInstance(env); minifi::jni::ThrowIf(env); auto flow_file = session->getSession()->create(); auto flow = std::make_shared<minifi::jni::JniFlowFile>(flow_file, session->getServicer(), ff_instance); flow_file->addReference(flow); auto rawFlow = session->addFlowFile(flow); minifi::jni::JVMLoader::getInstance()->setReference(ff_instance, env, rawFlow); return ff_instance; } JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_readFlowFile(JNIEnv *env, jobject obj, jobject ff) { if (obj == nullptr) { return nullptr; } THROW_IF_NULL(ff, env, NO_FF_OBJECT); minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff); if (ptr->get()) { auto jincls = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniInputStream", env); auto jin = jincls.newInstance(env); minifi::jni::ThrowIf(env); auto callback = std::make_unique<minifi::jni::JniByteInputStream>(4096); session->getSession()->read(ptr->get(), std::ref(*callback)); auto jniInpuStream = std::make_shared<minifi::jni::JniInputStream>(std::move(callback), jin, session->getServicer()); session->addInputStream(jniInpuStream); minifi::jni::JVMLoader::getInstance()->setReference(jin, env, jniInpuStream.get()); return jin; } return nullptr; } JNIEXPORT jint JNICALL Java_org_apache_nifi_processor_JniInputStream_read(JNIEnv *env, jobject obj) { minifi::jni::JniInputStream *jin = minifi::jni::JVMLoader::getPtr<minifi::jni::JniInputStream>(env, obj); if (obj == nullptr) { // this technically can't happen per JNI specs return -1; } uint8_t value = 0; if (jin->read(value) > 0) return value; else return -1; } JNIEXPORT jint JNICALL Java_org_apache_nifi_processor_JniInputStream_readWithOffset(JNIEnv *env, jobject obj, jbyteArray arr, jint offset, jint length) { minifi::jni::JniInputStream *jin = minifi::jni::JVMLoader::getPtr<minifi::jni::JniInputStream>(env, obj); if (obj == nullptr) { // this technically can't happen per JNI specs return -1; } return jin->read(env, arr, static_cast<int>(offset), static_cast<int>(length)); } JNIEXPORT jboolean JNICALL Java_org_apache_nifi_processor_JniProcessSession_write(JNIEnv *env, jobject obj, jobject ff, jbyteArray byteArray) { if (obj == nullptr) { return false; } THROW_IF((ff == nullptr || byteArray == nullptr), env, "No flowfile to write"); minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff); if (ptr->get()) { jbyte* buffer = env->GetByteArrayElements(byteArray, 0); jsize length = env->GetArrayLength(byteArray); if (length > 0) { session->getSession()->writeBuffer(ptr->get(), gsl::make_span(reinterpret_cast<std::byte*>(buffer), gsl::narrow<size_t>(length))); } env->ReleaseByteArrayElements(byteArray, buffer, 0); return true; } return false; } JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_clone(JNIEnv *env, jobject obj, jobject prevff) { if (obj == nullptr) { return nullptr; } THROW_IF_NULL(prevff, env, NO_FF_OBJECT); minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, prevff); if (ptr->get()) { auto ff = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniFlowFile", env); auto ff_instance = ff.newInstance(env); minifi::jni::ThrowIf(env); auto flow_file = session->getSession()->clone(ptr->get()); auto flow = std::make_shared<minifi::jni::JniFlowFile>(flow_file, session->getServicer(), ff_instance); flow_file->addReference(flow); auto rawFlow = session->addFlowFile(flow); minifi::jni::JVMLoader::getInstance()->setReference(ff_instance, env, rawFlow); return ff_instance; } return nullptr; } JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_get(JNIEnv *env, jobject obj) { if (obj == nullptr) return nullptr; minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); if (session == nullptr) return nullptr; auto flow_file = session->getSession()->get(); auto prevFF = session->getFlowFileReference(flow_file); if (prevFF != nullptr) { return prevFF->getJniReference(); } // otherwise create one auto ff = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniFlowFile"); auto ff_instance = ff.newInstance(env); if (flow_file == nullptr || ff_instance == nullptr) { // this is an acceptable condition. return nullptr; } auto flow = std::make_shared<minifi::jni::JniFlowFile>(flow_file, session->getServicer(), ff_instance); flow_file->addReference(flow); auto rawFlow = session->addFlowFile(flow); minifi::jni::JVMLoader::getInstance()->setReference(ff_instance, env, rawFlow); return ff_instance; } JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_putAttribute(JNIEnv *env, jobject obj, jobject ff, jstring key, jstring value) { if (obj == nullptr) { // does not mean an error should be thrown, rather we will let // Java do its thing as this is a condition of GC most likely return nullptr; } THROW_IF_NULL(ff, env, NO_FF_OBJECT); minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff); if (ff == nullptr || key == nullptr || value == nullptr) { return nullptr; } auto resKey = JniStringToUTF(env, key); auto resValue = JniStringToUTF(env, value); if (!ptr->get()->addAttribute(resKey, resValue)) { if (resKey != minifi::core::SpecialFlowAttribute::UUID) { // don't update the attribute uuid ptr->get()->updateAttribute(resKey, resValue); } } return ff; } JNIEXPORT void JNICALL Java_org_apache_nifi_processor_JniProcessSession_transfer(JNIEnv *env, jobject obj, jobject ff, jstring relationship) { if (obj == nullptr) { return; } THROW_IF_NULL(ff, env, NO_FF_OBJECT); minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff); minifi::core::Relationship rel(JniStringToUTF(env, relationship), "description"); session->getSession()->transfer(ptr->get(), rel); } jstring Java_org_apache_nifi_processor_JniProcessSession_getPropertyValue(JNIEnv *env, jobject obj, jstring propertyName) { std::string value; if (obj == nullptr) { return env->NewStringUTF(value.c_str()); } minifi::core::ProcessContext *context = minifi::jni::JVMLoader::getPtr<minifi::core::ProcessContext>(env, obj); std::string keystr = JniStringToUTF(env, propertyName); if (!context->getProperty(keystr, value)) { context->getDynamicProperty(keystr, value); } return env->NewStringUTF(value.c_str()); } JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_createWithParent(JNIEnv *env, jobject obj, jobject parent) { if (obj == nullptr) { // does not mean an error should be thrown, rather we will let // Java do its thing as this is a condition of GC most likely return nullptr; } THROW_IF_NULL(parent, env, NO_FF_OBJECT); minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, parent); if (ptr->get()) { auto ff = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniFlowFile", env); auto ff_instance = ff.newInstance(env); minifi::jni::ThrowIf(env); auto flow_file = session->getSession()->create(ptr->get()); auto flow = std::make_shared<minifi::jni::JniFlowFile>(flow_file, session->getServicer(), ff_instance); flow_file->addReference(flow); auto rawFlow = session->addFlowFile(flow); minifi::jni::JVMLoader::getInstance()->setReference(ff_instance, env, rawFlow); return ff_instance; } return nullptr; } JNIEXPORT void JNICALL Java_org_apache_nifi_processor_JniProcessSession_commit(JNIEnv *env, jobject obj) { if (obj == nullptr) { // does not mean an error should be thrown, rather we will let // Java do its thing as this is a condition of GC most likely return; } minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); try { session->getSession()->commit(); } catch (const std::exception &e) { std::string error = "error while committing: "; error += e.what(); minifi::jni::ThrowJava(env, error.c_str()); } catch (...) { minifi::jni::ThrowJava(env, "error while commiting"); } } JNIEXPORT void JNICALL Java_org_apache_nifi_processor_JniProcessSession_rollback(JNIEnv *env, jobject obj) { if (obj == nullptr) { // does not mean an error should be thrown, rather we will let // Java do its thing as this is a condition of GC most likely return; } minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); session->getSession()->rollback(); } JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSessionFactory_createSession(JNIEnv *env, jobject obj) { if (obj == nullptr) { // does not mean an error should be thrown, rather we will let // Java do its thing as this is a condition of GC most likely return nullptr; } minifi::jni::JniSessionFactory *sessionFactory = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSessionFactory>(env, obj); auto session_class = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniProcessSession", env); auto session_instance = session_class.newInstance(env); minifi::jni::ThrowIf(env); // create a session auto procSession = sessionFactory->getFactory()->createSession(); std::shared_ptr<minifi::jni::JniSession> session = std::make_shared<minifi::jni::JniSession>(procSession, session_instance, sessionFactory->getServicer()); // add a reference so the minifi C++ session factory knows to remove these eventually. procSession->addReference(session); auto rawSession = sessionFactory->addSession(session); // set the reference in session_instance using the raw pointer. minifi::jni::JVMLoader::getInstance()->setReference(session_instance, env, rawSession); // catalog the session return session_instance; } JNIEXPORT void JNICALL Java_org_apache_nifi_processor_JniProcessSession_remove(JNIEnv *env, jobject obj, jobject ff) { if (obj == nullptr) { // does not mean an error should be thrown, rather we will let // Java do its thing as this is a condition of GC most likely return; } THROW_IF_NULL(ff, env, NO_FF_OBJECT); minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff); if (ptr->get()) { session->getSession()->remove(ptr->get()); } } JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_penalize(JNIEnv *env, jobject obj, jobject ff) { if (obj == nullptr) { // does not mean an error should be thrown, rather we will let // Java do its thing as this is a condition of GC most likely return ff; } THROW_IF_NULL(ff, env, NO_FF_OBJECT); minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff); if (ptr->get()) { session->getSession()->penalize(ptr->get()); } return ff; } JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_removeAttribute(JNIEnv *env, jobject obj, jobject ff, jstring attr) { if (obj == nullptr) { // does not mean an error should be thrown, rather we will let // Java do its thing as this is a condition of GC most likely return ff; } THROW_IF_NULL(ff, env, NO_FF_OBJECT); minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff); if (ptr->get()) { ptr->get()->removeAttribute(JniStringToUTF(env, attr)); } return ff; } JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_clonePortion(JNIEnv *env, jobject obj, jobject prevff, jlong offset, jlong size) { if (obj == nullptr) { // does not mean an error should be thrown, rather we will let // Java do its thing as this is a condition of GC most likely return prevff; } THROW_IF_NULL(prevff, env, NO_FF_OBJECT); minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, prevff); if (ptr->get()) { auto ff = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniFlowFile", env); auto ff_instance = ff.newInstance(env); minifi::jni::ThrowIf(env); auto flow_file = session->getSession()->clone(ptr->get(), offset, size); auto flow = std::make_shared<minifi::jni::JniFlowFile>(flow_file, session->getServicer(), ff_instance); flow_file->addReference(flow); auto rawFlow = session->addFlowFile(flow); minifi::jni::JVMLoader::getInstance()->setReference(ff_instance, env, rawFlow); return ff_instance; } return nullptr; } JNIEXPORT jboolean JNICALL Java_org_apache_nifi_processor_JniProcessSession_append(JNIEnv *env, jobject obj, jobject ff, jbyteArray byteArray) { if (obj == nullptr) { // does not mean an error should be thrown, rather we will let // Java do its thing as this is a condition of GC most likely return false; } THROW_IF((ff == nullptr || byteArray == nullptr), env, NO_FF_OBJECT); auto *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj); auto *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff); if (ptr->get()) { jbyte* buffer = env->GetByteArrayElements(byteArray, 0); jsize length = env->GetArrayLength(byteArray); if (length > 0) { session->getSession()->appendBuffer(ptr->get(), gsl::make_span(reinterpret_cast<std::byte*>(buffer), gsl::narrow<size_t>(length))); } env->ReleaseByteArrayElements(byteArray, buffer, 0); return true; } return false; } #ifdef __cplusplus } #endif