cpp-ch/local-engine/local_engine_jni.cpp (1,135 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <numeric> #include <string> #include <jni.h> #include <Builder/SerializedPlanBuilder.h> #include <Compression/CompressedReadBuffer.h> #include <DataTypes/DataTypeNullable.h> #include <Join/BroadCastJoinBuilder.h> #include <Operator/BlockCoalesceOperator.h> #include <Parser/CHColumnToSparkRow.h> #include <Parser/MergeTreeRelParser.h> #include <Parser/RelParser.h> #include <Parser/SerializedPlanParser.h> #include <Parser/SparkRowToCHColumn.h> #include <Shuffle/CachedShuffleWriter.h> #include <Shuffle/NativeSplitter.h> #include <Shuffle/NativeWriterInMemory.h> #include <Shuffle/PartitionWriter.h> #include <Shuffle/ShuffleReader.h> #include <Shuffle/ShuffleSplitter.h> #include <Shuffle/ShuffleWriter.h> #include <Shuffle/ShuffleWriterBase.h> #include <Shuffle/WriteBufferFromJavaOutputStream.h> #include <Storages/Mergetree/SparkMergeTreeWriter.h> #include <Storages/Output/BlockStripeSplitter.h> #include <Storages/Output/FileWriterWrappers.h> #include <Storages/SubstraitSource/ReadBufferBuilder.h> #include <jni/ReservationListenerWrapper.h> #include <jni/SharedPointerWrapper.h> #include <jni/jni_common.h> #include <jni/jni_error.h> #include <Poco/Logger.h> #include <Poco/StringTokenizer.h> #include <Common/CHUtil.h> #include <Common/CurrentThread.h> #include <Common/ExceptionUtils.h> #include <Common/JNIUtils.h> #include <Common/QueryContext.h> #ifdef __cplusplus namespace DB { namespace ErrorCodes { extern const int CANNOT_PARSE_PROTOBUF_SCHEMA; } } static DB::ColumnWithTypeAndName getColumnFromColumnVector(JNIEnv * /*env*/, jobject /*obj*/, jlong block_address, jint column_position) { DB::Block * block = reinterpret_cast<DB::Block *>(block_address); return block->getByPosition(column_position); } static std::string jstring2string(JNIEnv * env, jstring jStr) { if (!jStr) return ""; jclass string_class = env->GetObjectClass(jStr); jmethodID get_bytes = env->GetMethodID(string_class, "getBytes", "(Ljava/lang/String;)[B"); jbyteArray string_jbytes = static_cast<jbyteArray>(local_engine::safeCallObjectMethod(env, jStr, get_bytes, env->NewStringUTF("UTF-8"))); size_t length = static_cast<size_t>(env->GetArrayLength(string_jbytes)); jbyte * p_bytes = env->GetByteArrayElements(string_jbytes, nullptr); std::string ret = std::string(reinterpret_cast<char *>(p_bytes), length); env->ReleaseByteArrayElements(string_jbytes, p_bytes, JNI_ABORT); env->DeleteLocalRef(string_jbytes); env->DeleteLocalRef(string_class); return ret; } static jstring stringTojstring(JNIEnv * env, const char * pat) { jclass strClass = (env)->FindClass("java/lang/String"); jmethodID ctorID = (env)->GetMethodID(strClass, "<init>", "([BLjava/lang/String;)V"); jbyteArray bytes = (env)->NewByteArray(strlen(pat)); (env)->SetByteArrayRegion(bytes, 0, strlen(pat), reinterpret_cast<const jbyte *>(pat)); jstring encoding = (env)->NewStringUTF("UTF-8"); return static_cast<jstring>((env)->NewObject(strClass, ctorID, bytes, encoding)); } extern "C" { #endif namespace dbms { class LocalExecutor; } static jclass spark_row_info_class; static jmethodID spark_row_info_constructor; static jclass block_stripes_class; static jmethodID block_stripes_constructor; static jclass split_result_class; static jmethodID split_result_constructor; static jclass native_metrics_class; static jmethodID native_metrics_constructor; JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) { JNIEnv * env; if (vm->GetEnv(reinterpret_cast<void **>(&env), JNI_VERSION_1_8) != JNI_OK) return JNI_ERR; local_engine::JniErrorsGlobalState::instance().initialize(env); spark_row_info_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/row/SparkRowInfo;"); spark_row_info_constructor = local_engine::GetMethodID(env, spark_row_info_class, "<init>", "([J[JJJJ)V"); block_stripes_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/spark/sql/execution/datasources/BlockStripes;"); block_stripes_constructor = local_engine::GetMethodID(env, block_stripes_class, "<init>", "(J[J[II)V"); split_result_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/CHSplitResult;"); split_result_constructor = local_engine::GetMethodID(env, split_result_class, "<init>", "(JJJJJJ[J[JJJJ)V"); local_engine::ShuffleReader::input_stream_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/ShuffleInputStream;"); local_engine::NativeSplitter::iterator_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/IteratorWrapper;"); local_engine::WriteBufferFromJavaOutputStream::output_stream_class = local_engine::CreateGlobalClassReference(env, "Ljava/io/OutputStream;"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/execution/ColumnarNativeIterator;"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_hasNext = local_engine::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "hasNext", "()Z"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_next = local_engine::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "next", "()[B"); local_engine::ShuffleReader::input_stream_read = local_engine::GetMethodID(env, local_engine::ShuffleReader::input_stream_class, "read", "(JJ)J"); local_engine::NativeSplitter::iterator_has_next = local_engine::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "hasNext", "()Z"); local_engine::NativeSplitter::iterator_next = local_engine::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "next", "()J"); local_engine::WriteBufferFromJavaOutputStream::output_stream_write = local_engine::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "write", "([BII)V"); local_engine::WriteBufferFromJavaOutputStream::output_stream_flush = local_engine::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "flush", "()V"); local_engine::SparkRowToCHColumn::spark_row_interator_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/execution/SparkRowIterator;"); local_engine::SparkRowToCHColumn::spark_row_interator_hasNext = local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "hasNext", "()Z"); local_engine::SparkRowToCHColumn::spark_row_interator_next = local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "next", "()[B"); local_engine::SparkRowToCHColumn::spark_row_iterator_nextBatch = local_engine::GetMethodID( env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "nextBatch", "()Ljava/nio/ByteBuffer;"); local_engine::ReservationListenerWrapper::reservation_listener_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/memory/alloc/CHReservationListener;"); local_engine::ReservationListenerWrapper::reservation_listener_reserve = local_engine::GetMethodID(env, local_engine::ReservationListenerWrapper::reservation_listener_class, "reserve", "(J)J"); local_engine::ReservationListenerWrapper::reservation_listener_reserve_or_throw = local_engine::GetMethodID(env, local_engine::ReservationListenerWrapper::reservation_listener_class, "reserveOrThrow", "(J)V"); local_engine::ReservationListenerWrapper::reservation_listener_unreserve = local_engine::GetMethodID(env, local_engine::ReservationListenerWrapper::reservation_listener_class, "unreserve", "(J)J"); native_metrics_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/metrics/NativeMetrics;"); native_metrics_constructor = local_engine::GetMethodID(env, native_metrics_class, "<init>", "(Ljava/lang/String;)V"); local_engine::BroadCastJoinBuilder::init(env); local_engine::JNIUtils::vm = vm; return JNI_VERSION_1_8; } JNIEXPORT void JNI_OnUnload(JavaVM * vm, void * /*reserved*/) { local_engine::BackendFinalizerUtil::finalizeGlobally(); JNIEnv * env; vm->GetEnv(reinterpret_cast<void **>(&env), JNI_VERSION_1_8); local_engine::JniErrorsGlobalState::instance().destroy(env); local_engine::BroadCastJoinBuilder::destroy(env); env->DeleteGlobalRef(spark_row_info_class); env->DeleteGlobalRef(block_stripes_class); env->DeleteGlobalRef(split_result_class); env->DeleteGlobalRef(local_engine::ShuffleReader::input_stream_class); env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class); env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class); env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class); env->DeleteGlobalRef(local_engine::SparkRowToCHColumn::spark_row_interator_class); env->DeleteGlobalRef(local_engine::ReservationListenerWrapper::reservation_listener_class); env->DeleteGlobalRef(native_metrics_class); } JNIEXPORT void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv * env, jobject, jbyteArray conf_plan) { LOCAL_ENGINE_JNI_METHOD_START jsize plan_buf_size = env->GetArrayLength(conf_plan); jbyte * plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr); std::string plan_str; plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr), plan_buf_size); local_engine::BackendInitializerUtil::init(&plan_str); env->ReleaseByteArrayElements(conf_plan, plan_buf_addr, JNI_ABORT); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeFinalizeNative(JNIEnv * env) { LOCAL_ENGINE_JNI_METHOD_START local_engine::BackendFinalizerUtil::finalizeSessionally(); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWithIterator( JNIEnv * env, jobject /*obj*/, jlong allocator_id, jbyteArray plan, jobjectArray split_infos, jobjectArray iter_arr, jbyteArray conf_plan, jboolean materialize_input) { LOCAL_ENGINE_JNI_METHOD_START auto query_context = local_engine::getAllocator(allocator_id)->query_context; // by task update new configs ( in case of dynamic config update ) jsize plan_buf_size = env->GetArrayLength(conf_plan); jbyte * plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr); std::string plan_str; plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr), plan_buf_size); local_engine::BackendInitializerUtil::updateConfig(query_context, &plan_str); local_engine::SerializedPlanParser parser(query_context); jsize iter_num = env->GetArrayLength(iter_arr); for (jsize i = 0; i < iter_num; i++) { jobject iter = env->GetObjectArrayElement(iter_arr, i); iter = env->NewGlobalRef(iter); parser.addInputIter(iter, materialize_input); } for (jsize i = 0, split_info_arr_size = env->GetArrayLength(split_infos); i < split_info_arr_size; i++) { jbyteArray split_info = static_cast<jbyteArray>(env->GetObjectArrayElement(split_infos, i)); jsize split_info_size = env->GetArrayLength(split_info); jbyte * split_info_addr = env->GetByteArrayElements(split_info, nullptr); std::string split_info_str; split_info_str.assign(reinterpret_cast<const char *>(split_info_addr), split_info_size); parser.addSplitInfo(split_info_str); } jsize plan_size = env->GetArrayLength(plan); jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); std::string plan_string; plan_string.assign(reinterpret_cast<const char *>(plan_address), plan_size); auto query_plan = parser.parse(plan_string); local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context, query_context); executor->setMetric(parser.getMetric()); executor->setExtraPlanHolder(parser.extra_plan_holder); executor->execute(std::move(query_plan)); env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); env->ReleaseByteArrayElements(conf_plan, plan_buf_addr, JNI_ABORT); return reinterpret_cast<jlong>(executor); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jboolean Java_io_glutenproject_row_RowIterator_nativeHasNext(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address); return executor->hasNext(); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jobject Java_io_glutenproject_row_RowIterator_nativeNext(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address); local_engine::SparkRowInfoPtr spark_row_info = executor->next(); auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows()); const auto * offsets_src = reinterpret_cast<const jlong *>(spark_row_info->getOffsets().data()); env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(), offsets_src); auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows()); const auto * lengths_src = reinterpret_cast<const jlong *>(spark_row_info->getLengths().data()); env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(), lengths_src); int64_t address = reinterpret_cast<int64_t>(spark_row_info->getBufferAddress()); int64_t column_number = reinterpret_cast<int64_t>(spark_row_info->getNumCols()); int64_t total_size = reinterpret_cast<int64_t>(spark_row_info->getTotalBytes()); jobject spark_row_info_object = env->NewObject(spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); return spark_row_info_object; LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT void Java_io_glutenproject_row_RowIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address); delete executor; LOCAL_ENGINE_JNI_METHOD_END(env, ) } // Columnar Iterator JNIEXPORT jboolean Java_io_glutenproject_vectorized_BatchIterator_nativeHasNext(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address); return executor->hasNext(); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jlong Java_io_glutenproject_vectorized_BatchIterator_nativeCHNext(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address); DB::Block * column_batch = executor->nextColumnar(); // LOG_DEBUG(&Poco::Logger::get("jni"), "row size of the column batch: {}", column_batch->rows()); return reinterpret_cast<Int64>(column_batch); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_io_glutenproject_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address); delete executor; LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jobject Java_io_glutenproject_vectorized_BatchIterator_nativeFetchMetrics(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START /// Collect metrics only if optimizations are disabled, otherwise coredump would happen. local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address); auto metric = executor->getMetric(); String metrics_json = metric ? local_engine::RelMetricSerializer::serializeRelMetric(metric) : ""; LOG_DEBUG(&Poco::Logger::get("jni"), "{}", metrics_json); jobject native_metrics = env->NewObject(native_metrics_class, native_metrics_constructor, stringTojstring(env, metrics_json.c_str())); return native_metrics; LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeSetJavaTmpDir(JNIEnv * /*env*/, jobject /*obj*/, jstring /*dir*/) { } JNIEXPORT void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeSetBatchSize(JNIEnv * /*env*/, jobject /*obj*/, jint /*batch_size*/) { } JNIEXPORT void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeSetMetricsTime( JNIEnv * /*env*/, jobject /*obj*/, jboolean /*setMetricsTime*/) { } JNIEXPORT jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeHasNull(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START DB::Block * block = reinterpret_cast<DB::Block *>(block_address); auto col = getColumnFromColumnVector(env, obj, block_address, column_position); if (!col.column->isNullable()) { return false; } else { const auto * nullable = checkAndGetColumn<DB::ColumnNullable>(*col.column); size_t num_nulls = std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); return num_nulls < block->rows(); } LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jint Java_io_glutenproject_vectorized_CHColumnVector_nativeNumNulls(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); if (!col.column->isNullable()) { return 0; } else { const auto * nullable = checkAndGetColumn<DB::ColumnNullable>(*col.column); return std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); } LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeIsNullAt( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); return col.column->isNullAt(row_id); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeGetBoolean( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); return nested_col->getBool(row_id); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jbyte Java_io_glutenproject_vectorized_CHColumnVector_nativeGetByte( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); return reinterpret_cast<const jbyte *>(nested_col->getDataAt(row_id).data)[0]; LOCAL_ENGINE_JNI_METHOD_END(env, 0) } JNIEXPORT jshort Java_io_glutenproject_vectorized_CHColumnVector_nativeGetShort( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); return reinterpret_cast<const jshort *>(nested_col->getDataAt(row_id).data)[0]; LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jint Java_io_glutenproject_vectorized_CHColumnVector_nativeGetInt( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); if (col.type->getTypeId() == DB::TypeIndex::Date) return nested_col->getUInt(row_id); else return nested_col->getInt(row_id); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_io_glutenproject_vectorized_CHColumnVector_nativeGetLong( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); return nested_col->getInt(row_id); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jfloat Java_io_glutenproject_vectorized_CHColumnVector_nativeGetFloat( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); return nested_col->getFloat32(row_id); LOCAL_ENGINE_JNI_METHOD_END(env, 0.0) } JNIEXPORT jdouble Java_io_glutenproject_vectorized_CHColumnVector_nativeGetDouble( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); return nested_col->getFloat64(row_id); LOCAL_ENGINE_JNI_METHOD_END(env, 0.0) } JNIEXPORT jstring Java_io_glutenproject_vectorized_CHColumnVector_nativeGetString( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); const auto * string_col = checkAndGetColumn<DB::ColumnString>(nested_col.get()); auto result = string_col->getDataAt(row_id); return local_engine::charTojstring(env, result.toString().c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, local_engine::charTojstring(env, "")) } // native block JNIEXPORT void Java_io_glutenproject_vectorized_CHNativeBlock_nativeClose(JNIEnv * /*env*/, jobject /*obj*/, jlong /*block_address*/) { } JNIEXPORT jint Java_io_glutenproject_vectorized_CHNativeBlock_nativeNumRows(JNIEnv * env, jobject /*obj*/, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START DB::Block * block = reinterpret_cast<DB::Block *>(block_address); return block->rows(); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jint Java_io_glutenproject_vectorized_CHNativeBlock_nativeNumColumns(JNIEnv * env, jobject /*obj*/, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START auto * block = reinterpret_cast<DB::Block *>(block_address); return block->columns(); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jbyteArray Java_io_glutenproject_vectorized_CHNativeBlock_nativeColumnType(JNIEnv * env, jobject /*obj*/, jlong block_address, jint position) { LOCAL_ENGINE_JNI_METHOD_START auto * block = reinterpret_cast<DB::Block *>(block_address); const auto & col = block->getByPosition(position); std::string substrait_type; dbms::SerializedPlanBuilder::buildType(col.type, substrait_type); return local_engine::stringTojbyteArray(env, substrait_type); LOCAL_ENGINE_JNI_METHOD_END(env, local_engine::stringTojbyteArray(env, "")) } JNIEXPORT jlong Java_io_glutenproject_vectorized_CHNativeBlock_nativeTotalBytes(JNIEnv * env, jobject /*obj*/, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START auto * block = reinterpret_cast<DB::Block *>(block_address); return block->bytes(); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_io_glutenproject_vectorized_CHStreamReader_createNativeShuffleReader( JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed, jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes) { LOCAL_ENGINE_JNI_METHOD_START auto * input = env->NewGlobalRef(input_stream); auto read_buffer = std::make_unique<local_engine::ReadBufferFromJavaInputStream>(input); auto * shuffle_reader = new local_engine::ShuffleReader(std::move(read_buffer), compressed, max_shuffle_read_rows, max_shuffle_read_bytes); return reinterpret_cast<jlong>(shuffle_reader); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_io_glutenproject_vectorized_CHStreamReader_nativeNext(JNIEnv * env, jobject /*obj*/, jlong shuffle_reader) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleReader * reader = reinterpret_cast<local_engine::ShuffleReader *>(shuffle_reader); DB::Block * block = reader->read(); return reinterpret_cast<jlong>(block); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_io_glutenproject_vectorized_CHStreamReader_nativeClose(JNIEnv * env, jobject /*obj*/, jlong shuffle_reader) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleReader * reader = reinterpret_cast<local_engine::ShuffleReader *>(shuffle_reader); delete reader; LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_io_glutenproject_vectorized_CHCoalesceOperator_createNativeOperator(JNIEnv * env, jobject /*obj*/, jint buf_size) { LOCAL_ENGINE_JNI_METHOD_START local_engine::BlockCoalesceOperator * instance = new local_engine::BlockCoalesceOperator(buf_size); return reinterpret_cast<jlong>(instance); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeMergeBlock( JNIEnv * env, jobject /*obj*/, jlong instance_address, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::BlockCoalesceOperator * instance = reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address); DB::Block * block = reinterpret_cast<DB::Block *>(block_address); auto new_block = DB::Block(*block); instance->mergeBlock(new_block); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jboolean Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeIsFull(JNIEnv * env, jobject /*obj*/, jlong instance_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::BlockCoalesceOperator * instance = reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address); bool full = instance->isFull(); return full ? JNI_TRUE : JNI_FALSE; LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jlong Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeRelease(JNIEnv * env, jobject /*obj*/, jlong instance_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::BlockCoalesceOperator * instance = reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address); auto * block = instance->releaseBlock(); Int64 address = reinterpret_cast<jlong>(block); return address; LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong instance_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::BlockCoalesceOperator * instance = reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address); delete instance; LOCAL_ENGINE_JNI_METHOD_END(env, ) } // Splitter Jni Wrapper JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nativeMake( JNIEnv * env, jobject, jstring short_name, jint num_partitions, jbyteArray expr_list, jbyteArray out_expr_list, jint shuffle_id, jlong map_id, jint split_size, jstring codec, jstring data_file, jstring local_dirs, jint num_sub_dirs, jboolean prefer_spill, jlong spill_threshold, jstring hash_algorithm, jboolean throw_if_memory_exceed, jboolean flush_block_buffer_before_evict) { LOCAL_ENGINE_JNI_METHOD_START std::string hash_exprs; std::string out_exprs; if (expr_list != nullptr) { int len = env->GetArrayLength(expr_list); auto * str = reinterpret_cast<jbyte *>(new char[len]); memset(str, 0, len); env->GetByteArrayRegion(expr_list, 0, len, str); hash_exprs = std::string(str, str + len); delete[] str; } if (out_expr_list != nullptr) { int len = env->GetArrayLength(out_expr_list); auto * str = reinterpret_cast<jbyte *>(new char[len]); memset(str, 0, len); env->GetByteArrayRegion(out_expr_list, 0, len, str); out_exprs = std::string(str, str + len); delete[] str; } Poco::StringTokenizer local_dirs_tokenizer(jstring2string(env, local_dirs), ","); std::vector<std::string> local_dirs_list; local_dirs_list.insert(local_dirs_list.end(), local_dirs_tokenizer.begin(), local_dirs_tokenizer.end()); local_engine::SplitOptions options{ .split_size = static_cast<size_t>(split_size), .io_buffer_size = DB::DBMS_DEFAULT_BUFFER_SIZE, .data_file = jstring2string(env, data_file), .local_dirs_list = std::move(local_dirs_list), .num_sub_dirs = num_sub_dirs, .shuffle_id = shuffle_id, .map_id = static_cast<int>(map_id), .partition_num = static_cast<size_t>(num_partitions), .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), .spill_threshold = static_cast<size_t>(spill_threshold), .hash_algorithm = jstring2string(env, hash_algorithm), .throw_if_memory_exceed = static_cast<bool>(throw_if_memory_exceed), .flush_block_buffer_before_evict = static_cast<bool>(flush_block_buffer_before_evict)}; auto name = jstring2string(env, short_name); local_engine::SplitterHolder * splitter; if (prefer_spill) splitter = new local_engine::SplitterHolder{.splitter = local_engine::ShuffleSplitter::create(name, options)}; else splitter = new local_engine::SplitterHolder{.splitter = std::make_unique<local_engine::CachedShuffleWriter>(name, options)}; return reinterpret_cast<jlong>(splitter); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nativeMakeForRSS( JNIEnv * env, jobject, jstring short_name, jint num_partitions, jbyteArray expr_list, jbyteArray out_expr_list, jint shuffle_id, jlong map_id, jint split_size, jstring codec, jlong spill_threshold, jstring hash_algorithm, jobject pusher, jboolean throw_if_memory_exceed, jboolean flush_block_buffer_before_evict) { LOCAL_ENGINE_JNI_METHOD_START std::string hash_exprs; std::string out_exprs; if (expr_list != nullptr) { int len = env->GetArrayLength(expr_list); auto * str = reinterpret_cast<jbyte *>(new char[len]); env->GetByteArrayRegion(expr_list, 0, len, str); hash_exprs = std::string(str, str + len); delete[] str; } if (out_expr_list != nullptr) { int len = env->GetArrayLength(out_expr_list); auto * str = reinterpret_cast<jbyte *>(new char[len]); env->GetByteArrayRegion(out_expr_list, 0, len, str); out_exprs = std::string(str, str + len); delete[] str; } local_engine::SplitOptions options{ .split_size = static_cast<size_t>(split_size), .io_buffer_size = DB::DBMS_DEFAULT_BUFFER_SIZE, .shuffle_id = shuffle_id, .map_id = static_cast<int>(map_id), .partition_num = static_cast<size_t>(num_partitions), .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), .spill_threshold = static_cast<size_t>(spill_threshold), .hash_algorithm = jstring2string(env, hash_algorithm), .throw_if_memory_exceed = static_cast<bool>(throw_if_memory_exceed), .flush_block_buffer_before_evict = static_cast<bool>(flush_block_buffer_before_evict)}; auto name = jstring2string(env, short_name); local_engine::SplitterHolder * splitter; splitter = new local_engine::SplitterHolder{.splitter = std::make_unique<local_engine::CachedShuffleWriter>(name, options, pusher)}; return reinterpret_cast<jlong>(splitter); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_split(JNIEnv * env, jobject, jlong splitterId, jlong block) { LOCAL_ENGINE_JNI_METHOD_START local_engine::SplitterHolder * splitter = reinterpret_cast<local_engine::SplitterHolder *>(splitterId); DB::Block * data = reinterpret_cast<DB::Block *>(block); splitter->splitter->split(*data); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_evict(JNIEnv * env, jobject, jlong splitterId) { LOCAL_ENGINE_JNI_METHOD_START local_engine::SplitterHolder * splitter = reinterpret_cast<local_engine::SplitterHolder *>(splitterId); auto size = splitter->splitter->evictPartitions(); return size; LOCAL_ENGINE_JNI_METHOD_END(env, 0) } JNIEXPORT jobject Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_stop(JNIEnv * env, jobject, jlong splitterId) { LOCAL_ENGINE_JNI_METHOD_START local_engine::SplitterHolder * splitter = reinterpret_cast<local_engine::SplitterHolder *>(splitterId); auto result = splitter->splitter->stop(); const auto & partition_lengths = result.partition_lengths; auto * partition_length_arr = env->NewLongArray(partition_lengths.size()); const auto * src = reinterpret_cast<const jlong *>(partition_lengths.data()); env->SetLongArrayRegion(partition_length_arr, 0, partition_lengths.size(), src); const auto & raw_partition_lengths = result.raw_partition_lengths; auto * raw_partition_length_arr = env->NewLongArray(raw_partition_lengths.size()); const auto * raw_src = reinterpret_cast<const jlong *>(raw_partition_lengths.data()); env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src); jobject split_result = env->NewObject( split_result_class, split_result_constructor, result.total_compute_pid_time, result.total_write_time, result.total_spill_time, result.total_compress_time, result.total_bytes_written, result.total_bytes_spilled, partition_length_arr, raw_partition_length_arr, result.total_split_time, result.total_io_time, result.total_serialize_time); return split_result; LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_close(JNIEnv * env, jobject, jlong splitterId) { LOCAL_ENGINE_JNI_METHOD_START local_engine::SplitterHolder * splitter = reinterpret_cast<local_engine::SplitterHolder *>(splitterId); delete splitter; LOCAL_ENGINE_JNI_METHOD_END(env, ) } // CHBlockConverterJniWrapper JNIEXPORT jobject Java_io_glutenproject_vectorized_CHBlockConverterJniWrapper_convertColumnarToRow(JNIEnv * env, jclass, jlong block_address, jintArray masks) { LOCAL_ENGINE_JNI_METHOD_START local_engine::CHColumnToSparkRow converter; std::unique_ptr<local_engine::SparkRowInfo> spark_row_info = nullptr; local_engine::MaskVector mask = nullptr; DB::Block * block = reinterpret_cast<DB::Block *>(block_address); if (masks != nullptr) { jint size = env->GetArrayLength(masks); jboolean is_cp = JNI_FALSE; jint * values = env->GetIntArrayElements(masks, &is_cp); mask = std::make_unique<std::vector<size_t>>(); for (int j = 0; j < size; j++) mask->push_back(values[j]); env->ReleaseIntArrayElements(masks, values, JNI_ABORT); } spark_row_info = converter.convertCHColumnToSparkRow(*block, mask); auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows()); const auto * offsets_src = reinterpret_cast<const jlong *>(spark_row_info->getOffsets().data()); env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(), offsets_src); auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows()); const auto * lengths_src = reinterpret_cast<const jlong *>(spark_row_info->getLengths().data()); env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(), lengths_src); int64_t address = reinterpret_cast<int64_t>(spark_row_info->getBufferAddress()); int64_t column_number = reinterpret_cast<int64_t>(spark_row_info->getNumCols()); int64_t total_size = reinterpret_cast<int64_t>(spark_row_info->getTotalBytes()); jobject spark_row_info_object = env->NewObject(spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size); return spark_row_info_object; LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT void Java_io_glutenproject_vectorized_CHBlockConverterJniWrapper_freeMemory(JNIEnv * env, jclass, jlong address, jlong size) { LOCAL_ENGINE_JNI_METHOD_START local_engine::CHColumnToSparkRow converter; converter.freeMem(reinterpret_cast<char *>(address), size); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_io_glutenproject_vectorized_CHBlockConverterJniWrapper_convertSparkRowsToCHColumn( JNIEnv * env, jclass, jobject java_iter, jobjectArray names, jobjectArray types) { LOCAL_ENGINE_JNI_METHOD_START using namespace std; int num_columns = env->GetArrayLength(names); vector<string> c_names; vector<string> c_types; c_names.reserve(num_columns); for (int i = 0; i < num_columns; i++) { auto * name = static_cast<jstring>(env->GetObjectArrayElement(names, i)); c_names.emplace_back(jstring2string(env, name)); auto * type = static_cast<jbyteArray>(env->GetObjectArrayElement(types, i)); auto type_length = env->GetArrayLength(type); jbyte * type_ptr = env->GetByteArrayElements(type, nullptr); string str_type(reinterpret_cast<const char *>(type_ptr), type_length); c_types.emplace_back(std::move(str_type)); env->ReleaseByteArrayElements(type, type_ptr, JNI_ABORT); env->DeleteLocalRef(name); env->DeleteLocalRef(type); } local_engine::SparkRowToCHColumn converter; auto * block = converter.convertSparkRowItrToCHColumn(java_iter, c_names, c_types); return reinterpret_cast<jlong>(block); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_io_glutenproject_vectorized_CHBlockConverterJniWrapper_freeBlock(JNIEnv * env, jclass, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::SparkRowToCHColumn converter; converter.freeBlock(reinterpret_cast<DB::Block *>(block_address)); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_io_glutenproject_vectorized_CHBlockWriterJniWrapper_nativeCreateInstance(JNIEnv * env, jobject) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = new local_engine::NativeWriterInMemory(); return reinterpret_cast<jlong>(writer); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_io_glutenproject_vectorized_CHBlockWriterJniWrapper_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance); auto * block = reinterpret_cast<DB::Block *>(block_address); writer->write(*block); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jint Java_io_glutenproject_vectorized_CHBlockWriterJniWrapper_nativeResultSize(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance); return static_cast<jint>(writer->collect().size()); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_io_glutenproject_vectorized_CHBlockWriterJniWrapper_nativeCollect(JNIEnv * env, jobject, jlong instance, jbyteArray result) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance); auto data = writer->collect(); env->SetByteArrayRegion(result, 0, data.size(), reinterpret_cast<const jbyte *>(data.data())); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_io_glutenproject_vectorized_CHBlockWriterJniWrapper_nativeClose(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance); delete writer; LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitFileWriterWrapper( JNIEnv * env, jobject, jstring file_uri_, jobjectArray names_, jstring format_hint_) { LOCAL_ENGINE_JNI_METHOD_START int num_columns = env->GetArrayLength(names_); std::vector<std::string> names; names.reserve(num_columns); for (int i = 0; i < num_columns; i++) { auto * name = static_cast<jstring>(env->GetObjectArrayElement(names_, i)); names.emplace_back(jstring2string(env, name)); env->DeleteLocalRef(name); } auto file_uri = jstring2string(env, file_uri_); auto format_hint = jstring2string(env, format_hint_); // for HiveFileFormat, the file url may not end with .parquet, so we pass in the format as a hint auto * writer = local_engine::createFileWriterWrapper(file_uri, names, format_hint); return reinterpret_cast<jlong>(writer); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitMergeTreeWriterWrapper( JNIEnv * env, jobject, jbyteArray plan_, jbyteArray split_info_, jstring uuid_, jstring task_id_, jstring partition_dir_, jstring bucket_dir_) { LOCAL_ENGINE_JNI_METHOD_START const auto uuid_str = jstring2string(env, uuid_); const auto task_id = jstring2string(env, task_id_); const auto partition_dir = jstring2string(env, partition_dir_); const auto bucket_dir = jstring2string(env, bucket_dir_); jsize plan_buf_size = env->GetArrayLength(plan_); jbyte * plan_buf_addr = env->GetByteArrayElements(plan_, nullptr); std::string plan_str; plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr), plan_buf_size); jsize split_info_size = env->GetArrayLength(split_info_); jbyte * split_info_addr = env->GetByteArrayElements(split_info_, nullptr); std::string split_info_str; split_info_str.assign(reinterpret_cast<const char *>(split_info_addr), split_info_size); auto plan_ptr = std::make_unique<substrait::Plan>(); /// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data /// Parsing may fail when the number of recursive layers is large. /// Here, set a limit large enough to avoid this problem. /// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information google::protobuf::io::CodedInputStream coded_in( reinterpret_cast<const uint8_t *>(plan_str.data()), static_cast<int>(plan_str.size())); coded_in.SetRecursionLimit(100000); auto ok = plan_ptr->ParseFromCodedStream(&coded_in); if (!ok) throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed"); substrait::ReadRel::ExtensionTable extension_table = local_engine::SerializedPlanParser::parseExtensionTable(split_info_str); auto storage = local_engine::MergeTreeRelParser::parseStorage( plan_ptr->relations()[0].root().input(), extension_table, local_engine::SerializedPlanParser::global_context); auto uuid = uuid_str + "_" + task_id; auto * writer = new local_engine::SparkMergeTreeWriter( *storage, storage->getInMemoryMetadataPtr(), local_engine::SerializedPlanParser::global_context, uuid, partition_dir, bucket_dir); env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT); env->ReleaseByteArrayElements(split_info_, split_info_addr, JNI_ABORT); return reinterpret_cast<jlong>(writer); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_write(JNIEnv * env, jobject, jlong instanceId, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::NormalFileWriter *>(instanceId); auto * block = reinterpret_cast<DB::Block *>(block_address); writer->consume(*block); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_close(JNIEnv * env, jobject, jlong instanceId) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::NormalFileWriter *>(instanceId); writer->close(); delete writer; LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_writeToMergeTree(JNIEnv * env, jobject, jlong instanceId, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::SparkMergeTreeWriter *>(instanceId); auto * block = reinterpret_cast<DB::Block *>(block_address); writer->write(*block); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_closeMergeTreeWriter(JNIEnv * env, jobject, jlong instanceId) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::SparkMergeTreeWriter *>(instanceId); writer->finalize(); auto part_infos = writer->getAllPartInfo(); auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos); delete writer; return stringTojstring(env, json_info.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT jobject Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_splitBlockByPartitionAndBucket( JNIEnv * env, jclass, jlong blockAddress, jintArray partitionColIndice, jboolean hasBucket, jboolean reserve_partition_columns) { LOCAL_ENGINE_JNI_METHOD_START auto * block = reinterpret_cast<DB::Block *>(blockAddress); int * pIndice = env->GetIntArrayElements(partitionColIndice, nullptr); int size = env->GetArrayLength(partitionColIndice); std::vector<size_t> partition_col_indice_vec; for (int i = 0; i < size; ++i) partition_col_indice_vec.push_back(pIndice[i]); env->ReleaseIntArrayElements(partitionColIndice, pIndice, JNI_ABORT); local_engine::BlockStripes bs = local_engine::BlockStripeSplitter::split(*block, partition_col_indice_vec, hasBucket, reserve_partition_columns); auto * addresses = env->NewLongArray(bs.block_addresses.size()); env->SetLongArrayRegion(addresses, 0, bs.block_addresses.size(), bs.block_addresses.data()); auto * indices = env->NewIntArray(bs.heading_row_indice.size()); env->SetIntArrayRegion(indices, 0, bs.heading_row_indice.size(), bs.heading_row_indice.data()); jobject block_stripes = env->NewObject( block_stripes_class, block_stripes_constructor, bs.origin_block_address, addresses, indices, bs.origin_block_num_columns); return block_stripes; LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT jlong Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeBuild( JNIEnv * env, jclass, jstring key, jbyteArray in, jlong row_count_, jstring join_key_, jint join_type_, jbyteArray named_struct) { LOCAL_ENGINE_JNI_METHOD_START const auto hash_table_id = jstring2string(env, key); const auto join_key = jstring2string(env, join_key_); const jsize struct_size = env->GetArrayLength(named_struct); jbyte * struct_address = env->GetByteArrayElements(named_struct, nullptr); std::string struct_string; struct_string.assign(reinterpret_cast<const char *>(struct_address), struct_size); const auto join_type = static_cast<substrait::JoinRel_JoinType>(join_type_); const jsize length = env->GetArrayLength(in); local_engine::ReadBufferFromByteArray read_buffer_from_java_array(in, length); DB::CompressedReadBuffer input(read_buffer_from_java_array); local_engine::configureCompressedReadBuffer(input); const auto * obj = make_wrapper(local_engine::BroadCastJoinBuilder::buildJoin(hash_table_id, input, row_count_, join_key, join_type, struct_string)); env->ReleaseByteArrayElements(named_struct, struct_address, JNI_ABORT); return obj->instance(); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } JNIEXPORT jlong Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeCloneBuildHashTable(JNIEnv * env, jclass, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START auto * cloned = local_engine::make_wrapper(local_engine::SharedPointerWrapper<local_engine::StorageJoinFromReadBuffer>::sharedPtr(instance)); return cloned->instance(); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } JNIEXPORT void Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeCleanBuildHashTable(JNIEnv * env, jclass, jstring hash_table_id_, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START auto hash_table_id = jstring2string(env, hash_table_id_); local_engine::BroadCastJoinBuilder::cleanBuildHashTable(hash_table_id, instance); LOCAL_ENGINE_JNI_METHOD_END(env, ) } // BlockSplitIterator JNIEXPORT jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate( JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jstring schema, jint partition_num, jint buffer_size, jstring hash_algorithm) { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Options options; options.partition_num = partition_num; options.buffer_size = buffer_size; auto hash_algorithm_str = jstring2string(env, hash_algorithm); options.hash_algorithm.swap(hash_algorithm_str); auto expr_str = jstring2string(env, expr); std::string schema_str; if (schema) schema_str = jstring2string(env, schema); options.exprs_buffer.swap(expr_str); options.schema_buffer.swap(schema_str); local_engine::NativeSplitter::Holder * splitter = new local_engine::NativeSplitter::Holder{ .splitter = local_engine::NativeSplitter::create(jstring2string(env, name), options, in)}; return reinterpret_cast<jlong>(splitter); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_io_glutenproject_vectorized_BlockSplitIterator_nativeClose(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance); delete splitter; LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jboolean Java_io_glutenproject_vectorized_BlockSplitIterator_nativeHasNext(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance); return splitter->splitter->hasNext(); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNext(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance); return reinterpret_cast<jlong>(splitter->splitter->next()); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jint Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNextPartitionId(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance); return reinterpret_cast<jint>(splitter->splitter->nextPartitionId()); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_io_glutenproject_vectorized_BlockOutputStream_nativeCreate( JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring codec, jboolean compressed, jint customize_buffer_size) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer = new local_engine::ShuffleWriter(output_stream, buffer, jstring2string(env, codec), compressed, customize_buffer_size); return reinterpret_cast<jlong>(writer); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_io_glutenproject_vectorized_BlockOutputStream_nativeClose(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast<local_engine::ShuffleWriter *>(instance); writer->flush(); delete writer; LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_io_glutenproject_vectorized_BlockOutputStream_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast<local_engine::ShuffleWriter *>(instance); DB::Block * block = reinterpret_cast<DB::Block *>(block_address); writer->write(*block); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_io_glutenproject_vectorized_BlockOutputStream_nativeFlush(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast<local_engine::ShuffleWriter *>(instance); writer->flush(); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_io_glutenproject_vectorized_SimpleExpressionEval_createNativeInstance(JNIEnv * env, jclass, jobject input, jbyteArray plan) { LOCAL_ENGINE_JNI_METHOD_START auto context = DB::Context::createCopy(local_engine::SerializedPlanParser::global_context); local_engine::SerializedPlanParser parser(context); jobject iter = env->NewGlobalRef(input); parser.addInputIter(iter, false); jsize plan_size = env->GetArrayLength(plan); jbyte * plan_address = env->GetByteArrayElements(plan, nullptr); std::string plan_string; plan_string.assign(reinterpret_cast<const char *>(plan_address), plan_size); auto query_plan = parser.parse(plan_string); local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context, context); executor->execute(std::move(query_plan)); env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT); return reinterpret_cast<jlong>(executor); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance); delete executor; LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jboolean Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeHasNext(JNIEnv * env, jclass, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance); return executor->hasNext(); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jlong Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeNext(JNIEnv * env, jclass, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance); return reinterpret_cast<jlong>(executor->nextColumnar()); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_io_glutenproject_memory_alloc_CHNativeMemoryAllocator_getDefaultAllocator(JNIEnv *, jclass) { return -1; } JNIEXPORT jlong Java_io_glutenproject_memory_alloc_CHNativeMemoryAllocator_createListenableAllocator(JNIEnv * env, jclass, jobject listener) { LOCAL_ENGINE_JNI_METHOD_START auto listener_wrapper = std::make_shared<local_engine::ReservationListenerWrapper>(env->NewGlobalRef(listener)); return local_engine::initializeQuery(listener_wrapper); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_io_glutenproject_memory_alloc_CHNativeMemoryAllocator_releaseAllocator(JNIEnv * env, jclass, jlong allocator_id) { LOCAL_ENGINE_JNI_METHOD_START local_engine::releaseAllocator(allocator_id); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_io_glutenproject_memory_alloc_CHNativeMemoryAllocator_bytesAllocated(JNIEnv * env, jclass, jlong allocator_id) { LOCAL_ENGINE_JNI_METHOD_START return local_engine::allocatorMemoryUsage(allocator_id); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } #ifdef __cplusplus } #endif