cpp/velox/jni/VeloxJniWrapper.cc (391 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <jni.h> #include <glog/logging.h> #include <jni/JniCommon.h> #include <exception> #include "JniUdf.h" #include "compute/Runtime.h" #include "compute/VeloxBackend.h" #include "compute/VeloxRuntime.h" #include "config/GlutenConfig.h" #include "jni/JniError.h" #include "jni/JniFileSystem.h" #include "memory/VeloxColumnarBatch.h" #include "memory/VeloxMemoryManager.h" #include "substrait/SubstraitToVeloxPlanValidator.h" #include "utils/ObjectStore.h" #include "utils/VeloxBatchResizer.h" #include "velox/common/base/BloomFilter.h" #include "velox/common/file/FileSystems.h" #include <iostream> using namespace gluten; using namespace facebook; namespace { jclass infoCls; jmethodID infoClsInitMethod; jclass blockStripesClass; jmethodID blockStripesConstructor; } // namespace #ifdef __cplusplus extern "C" { #endif jint JNI_OnLoad(JavaVM* vm, void*) { JNIEnv* env; if (vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) { return JNI_ERR; } getJniCommonState()->ensureInitialized(env); getJniErrorState()->ensureInitialized(env); initVeloxJniFileSystem(env); initVeloxJniUDF(env); infoCls = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/validate/NativePlanValidationInfo;"); infoClsInitMethod = env->GetMethodID(infoCls, "<init>", "(ILjava/lang/String;)V"); blockStripesClass = createGlobalClassReferenceOrError(env, "Lorg/apache/spark/sql/execution/datasources/BlockStripes;"); blockStripesConstructor = env->GetMethodID(blockStripesClass, "<init>", "(J[J[II[B)V"); DLOG(INFO) << "Loaded Velox backend."; return jniVersion; } void JNI_OnUnload(JavaVM* vm, void*) { JNIEnv* env; vm->GetEnv(reinterpret_cast<void**>(&env), jniVersion); env->DeleteGlobalRef(blockStripesClass); env->DeleteGlobalRef(infoCls); finalizeVeloxJniUDF(env); finalizeVeloxJniFileSystem(env); getJniErrorState()->close(); getJniCommonState()->close(); google::ShutdownGoogleLogging(); } JNIEXPORT void JNICALL Java_org_apache_gluten_init_NativeBackendInitializer_initialize( // NOLINT JNIEnv* env, jclass, jobject jListener, jbyteArray conf) { JNI_METHOD_START JavaVM* vm; if (env->GetJavaVM(&vm) != JNI_OK) { throw GlutenException("Unable to get JavaVM instance"); } auto safeArray = getByteArrayElementsSafe(env, conf); // Create a global allocation listener that reserves global off-heap memory from Java-side GlobalOffHeapMemory utility // class. std::unique_ptr<AllocationListener> listener = std::make_unique<SparkAllocationListener>(vm, jListener); auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length()); VeloxBackend::create(std::move(listener), sparkConf); JNI_METHOD_END() } JNIEXPORT void JNICALL Java_org_apache_gluten_init_NativeBackendInitializer_shutdown( // NOLINT JNIEnv* env, jclass) { JNI_METHOD_START VeloxBackend::get()->tearDown(); JNI_METHOD_END() } JNIEXPORT void JNICALL Java_org_apache_gluten_udf_UdfJniWrapper_registerFunctionSignatures( // NOLINT JNIEnv* env, jclass) { JNI_METHOD_START jniRegisterFunctionSignatures(env); JNI_METHOD_END() } JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFailureReason( // NOLINT JNIEnv* env, jobject wrapper, jbyteArray planArray) { JNI_METHOD_START const auto ctx = getRuntime(env, wrapper); const auto safeArray = getByteArrayElementsSafe(env, planArray); const auto planData = safeArray.elems(); const auto planSize = env->GetArrayLength(planArray); const auto runtime = dynamic_cast<VeloxRuntime*>(ctx); if (runtime->debugModeEnabled()) { try { const auto jsonPlan = substraitFromPbToJson("Plan", planData, planSize); LOG(INFO) << std::string(50, '#') << " received substrait::Plan: for validation"; LOG(INFO) << jsonPlan; } catch (const std::exception& e) { LOG(WARNING) << "Error converting Substrait plan for validation to JSON: " << e.what(); } } const auto pool = defaultLeafVeloxMemoryPool().get(); SubstraitToVeloxPlanValidator planValidator(pool); ::substrait::Plan subPlan; parseProtobuf(planData, planSize, &subPlan); try { const auto isSupported = planValidator.validate(subPlan); const auto logs = planValidator.getValidateLog(); std::string concatLog; for (int i = 0; i < logs.size(); i++) { concatLog += logs[i] + "@"; } return env->NewObject(infoCls, infoClsInitMethod, isSupported, env->NewStringUTF(concatLog.c_str())); } catch (std::invalid_argument& e) { LOG(INFO) << "Failed to validate substrait plan because " << e.what(); return env->NewObject(infoCls, infoClsInitMethod, false, env->NewStringUTF("")); } JNI_METHOD_END(nullptr) } JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJniWrapper_from( // NOLINT JNIEnv* env, jobject wrapper, jlong handle) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); auto runtime = dynamic_cast<VeloxRuntime*>(ctx); auto batch = ObjectStore::retrieve<ColumnarBatch>(handle); auto newBatch = VeloxColumnarBatch::from(runtime->memoryManager()->getLeafMemoryPool().get(), batch); return ctx->saveObject(newBatch); JNI_METHOD_END(kInvalidObjectHandle) } JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJniWrapper_compose( // NOLINT JNIEnv* env, jobject wrapper, jlongArray batchHandles) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); auto runtime = dynamic_cast<VeloxRuntime*>(ctx); int handleCount = env->GetArrayLength(batchHandles); auto safeArray = getLongArrayElementsSafe(env, batchHandles); std::vector<std::shared_ptr<ColumnarBatch>> batches; for (int i = 0; i < handleCount; ++i) { int64_t handle = safeArray.elems()[i]; auto batch = ObjectStore::retrieve<ColumnarBatch>(handle); batches.push_back(batch); } auto newBatch = VeloxColumnarBatch::compose(runtime->memoryManager()->getLeafMemoryPool().get(), std::move(batches)); return ctx->saveObject(newBatch); JNI_METHOD_END(kInvalidObjectHandle) } JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_empty( // NOLINT JNIEnv* env, jobject wrapper, jint capacity) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); auto filter = std::make_shared<velox::BloomFilter<std::allocator<uint64_t>>>(); filter->reset(capacity); GLUTEN_CHECK(filter->isSet(), "Bloom-filter is not initialized"); return ctx->saveObject(filter); JNI_METHOD_END(kInvalidObjectHandle) } JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_init( // NOLINT JNIEnv* env, jobject wrapper, jbyteArray data) { JNI_METHOD_START auto safeArray = getByteArrayElementsSafe(env, data); auto ctx = getRuntime(env, wrapper); auto filter = std::make_shared<velox::BloomFilter<std::allocator<uint64_t>>>(); uint8_t* serialized = safeArray.elems(); filter->merge(reinterpret_cast<char*>(serialized)); return ctx->saveObject(filter); JNI_METHOD_END(kInvalidObjectHandle) } JNIEXPORT void JNICALL Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_insertLong( // NOLINT JNIEnv* env, jobject wrapper, jlong handle, jlong item) { JNI_METHOD_START auto filter = ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle); GLUTEN_CHECK(filter->isSet(), "Bloom-filter is not initialized"); filter->insert(folly::hasher<int64_t>()(item)); JNI_METHOD_END() } JNIEXPORT jboolean JNICALL Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_mightContainLong( // NOLINT JNIEnv* env, jobject wrapper, jlong handle, jlong item) { JNI_METHOD_START auto filter = ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle); GLUTEN_CHECK(filter->isSet(), "Bloom-filter is not initialized"); bool out = filter->mayContain(folly::hasher<int64_t>()(item)); return out; JNI_METHOD_END(false) } namespace { static std::vector<char> serialize(BloomFilter<std::allocator<uint64_t>>* bf) { uint32_t size = bf->serializedSize(); std::vector<char> buffer; buffer.reserve(size); char* data = buffer.data(); bf->serialize(data); return buffer; } } // namespace JNIEXPORT void JNICALL Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_mergeFrom( // NOLINT JNIEnv* env, jobject wrapper, jlong handle, jlong other) { JNI_METHOD_START auto to = ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle); auto from = ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(other); GLUTEN_CHECK(to->isSet(), "Bloom-filter is not initialized"); GLUTEN_CHECK(from->isSet(), "Bloom-filter is not initialized"); std::vector<char> serialized = serialize(from.get()); to->merge(serialized.data()); JNI_METHOD_END() } JNIEXPORT jbyteArray JNICALL Java_org_apache_gluten_utils_VeloxBloomFilterJniWrapper_serialize( // NOLINT JNIEnv* env, jobject wrapper, jlong handle) { JNI_METHOD_START auto filter = ObjectStore::retrieve<velox::BloomFilter<std::allocator<uint64_t>>>(handle); GLUTEN_CHECK(filter->isSet(), "Bloom-filter is not initialized"); std::vector<char> buffer = serialize(filter.get()); auto size = buffer.capacity(); jbyteArray out = env->NewByteArray(size); env->SetByteArrayRegion(out, 0, size, reinterpret_cast<jbyte*>(buffer.data())); return out; JNI_METHOD_END(nullptr) } JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_VeloxBatchResizerJniWrapper_create( // NOLINT JNIEnv* env, jobject wrapper, jint minOutputBatchSize, jint maxOutputBatchSize, jobject jIter) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); auto pool = dynamic_cast<VeloxMemoryManager*>(ctx->memoryManager())->getLeafMemoryPool(); auto iter = makeJniColumnarBatchIterator(env, jIter, ctx); auto appender = std::make_shared<ResultIterator>( std::make_unique<VeloxBatchResizer>(pool.get(), minOutputBatchSize, maxOutputBatchSize, std::move(iter))); return ctx->saveObject(appender); JNI_METHOD_END(kInvalidObjectHandle) } JNIEXPORT jboolean JNICALL Java_org_apache_gluten_utils_VeloxFileSystemValidationJniWrapper_allSupportedByRegisteredFileSystems( // NOLINT JNIEnv* env, jclass, jobjectArray stringArray) { JNI_METHOD_START int size = env->GetArrayLength(stringArray); for (int i = 0; i < size; i++) { jstring string = (jstring)(env->GetObjectArrayElement(stringArray, i)); std::string path = jStringToCString(env, string); if (!velox::filesystems::isPathSupportedByRegisteredFileSystems(path)) { return false; } } return true; JNI_METHOD_END(false) } JNIEXPORT jlong JNICALL Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_init( // NOLINT JNIEnv* env, jobject wrapper, jstring filePath, jlong cSchema, jbyteArray options) { JNI_METHOD_START auto ctx = gluten::getRuntime(env, wrapper); auto runtime = dynamic_cast<VeloxRuntime*>(ctx); ObjectHandle handle = kInvalidObjectHandle; if (cSchema == -1) { // Only inspect the schema and not write handle = ctx->saveObject(runtime->createDataSource(jStringToCString(env, filePath), nullptr)); } else { auto safeArray = gluten::getByteArrayElementsSafe(env, options); auto datasourceOptions = gluten::parseConfMap(env, safeArray.elems(), safeArray.length()); auto& sparkConf = ctx->getConfMap(); datasourceOptions.insert(sparkConf.begin(), sparkConf.end()); auto schema = gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast<struct ArrowSchema*>(cSchema))); handle = ctx->saveObject(runtime->createDataSource(jStringToCString(env, filePath), schema)); auto datasource = ObjectStore::retrieve<VeloxDataSource>(handle); datasource->init(datasourceOptions); } return handle; JNI_METHOD_END(kInvalidObjectHandle) } JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_inspectSchema( // NOLINT JNIEnv* env, jobject wrapper, jlong dsHandle, jlong cSchema) { JNI_METHOD_START auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle); datasource->inspectSchema(reinterpret_cast<struct ArrowSchema*>(cSchema)); JNI_METHOD_END() } JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_close( // NOLINT JNIEnv* env, jobject wrapper, jlong dsHandle) { JNI_METHOD_START auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle); datasource->close(); ObjectStore::release(dsHandle); JNI_METHOD_END() } JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_writeBatch( // NOLINT JNIEnv* env, jobject wrapper, jlong dsHandle, jlong batchHandle) { JNI_METHOD_START auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle); auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle); datasource->write(batch); JNI_METHOD_END() } JNIEXPORT jobject JNICALL Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_splitBlockByPartitionAndBucket( // NOLINT JNIEnv* env, jobject wrapper, jlong batchHandle, jintArray partitionColIndice, jboolean hasBucket, jlong memoryManagerId) { JNI_METHOD_START auto ctx = gluten::getRuntime(env, wrapper); auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle); auto safeArray = gluten::getIntArrayElementsSafe(env, partitionColIndice); int size = env->GetArrayLength(partitionColIndice); std::vector<int32_t> partitionColIndiceVec; for (int i = 0; i < size; ++i) { partitionColIndiceVec.push_back(safeArray.elems()[i]); } auto result = batch->toUnsafeRow(0); auto rowBytes = result.data(); auto newBatchHandle = ctx->saveObject(ctx->select(batch, partitionColIndiceVec)); auto bytesSize = result.size(); jbyteArray bytesArray = env->NewByteArray(bytesSize); env->SetByteArrayRegion(bytesArray, 0, bytesSize, reinterpret_cast<jbyte*>(rowBytes)); jlongArray batchArray = env->NewLongArray(1); long* cBatchArray = new long[1]; cBatchArray[0] = newBatchHandle; env->SetLongArrayRegion(batchArray, 0, 1, cBatchArray); delete[] cBatchArray; jobject blockStripes = env->NewObject( blockStripesClass, blockStripesConstructor, batchHandle, batchArray, nullptr, batch->numColumns(), bytesArray); return blockStripes; JNI_METHOD_END(nullptr) } JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJniWrapper_slice( // NOLINT JNIEnv* env, jobject wrapper, jlong veloxBatchHandle, jint offset, jint limit) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); auto batch = ObjectStore::retrieve<ColumnarBatch>(veloxBatchHandle); auto numRows = batch->numRows(); if (limit >= numRows) { return veloxBatchHandle; } auto veloxBatch = std::dynamic_pointer_cast<VeloxColumnarBatch>(batch); VELOX_CHECK_NOT_NULL(veloxBatch, "Expected VeloxColumnarBatch but got a different type."); auto rowVector = veloxBatch->getRowVector(); auto prunedVector = rowVector->slice(offset, limit); auto prunedRowVector = std::dynamic_pointer_cast<facebook::velox::RowVector>(prunedVector); VELOX_CHECK_NOT_NULL(prunedRowVector, "Expected RowVector but got a different type."); auto prunedBatch = std::make_shared<VeloxColumnarBatch>(prunedRowVector); jlong prunedHandle = ctx->saveObject(prunedBatch); return prunedHandle; JNI_METHOD_END(kInvalidObjectHandle) } #ifdef __cplusplus } #endif