cpp-ch/local-engine/local_engine_jni.cpp (1,213 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <numeric> #include <string> #include <jni.h> #include <Builder/SerializedPlanBuilder.h> #include <Compression/CompressedReadBuffer.h> #include <DataTypes/DataTypeNullable.h> #include <Join/BroadCastJoinBuilder.h> #include <Parser/CHColumnToSparkRow.h> #include <Parser/LocalExecutor.h> #include <Parser/ParserContext.h> #include <Parser/RelParsers/MergeTreeRelParser.h> #include <Parser/RelParsers/RelParser.h> #include <Parser/RelParsers/WriteRelParser.h> #include <Parser/SerializedPlanParser.h> #include <Parser/SparkRowToCHColumn.h> #include <Parser/SubstraitParserUtils.h> #include <Shuffle/NativeSplitter.h> #include <Shuffle/NativeWriterInMemory.h> #include <Shuffle/PartitionWriter.h> #include <Shuffle/ShuffleCommon.h> #include <Shuffle/ShuffleReader.h> #include <Shuffle/ShuffleWriter.h> #include <Shuffle/SparkExchangeSink.h> #include <Shuffle/WriteBufferFromJavaOutputStream.h> #include <Storages/Cache/CacheManager.h> #include <Storages/MergeTree/MetaDataHelper.h> #include <Storages/MergeTree/SparkMergeTreeWriteSettings.h> #include <Storages/MergeTree/SparkMergeTreeWriter.h> #include <Storages/MergeTree/StorageMergeTreeFactory.h> #include <Storages/Output/BlockStripeSplitter.h> #include <Storages/Output/NormalFileWriter.h> #include <Storages/SubstraitSource/Delta/DeltaWriter.h> #include <jni/SharedPointerWrapper.h> #include <jni/jni_common.h> #include <jni/jni_error.h> #include <write_optimization.pb.h> #include <Poco/Logger.h> #include <Poco/StringTokenizer.h> #include <Common/CHUtil.h> #include <Common/ErrorCodes.h> #include <Common/ExceptionUtils.h> #include <Common/JNIUtils.h> #include <Common/QueryContext.h> #ifdef __cplusplus namespace DB { namespace ErrorCodes { extern const int CANNOT_PARSE_PROTOBUF_SCHEMA; extern const int UNKNOWN_EXCEPTION; } } static DB::ColumnWithTypeAndName getColumnFromColumnVector(JNIEnv * /*env*/, jobject /*obj*/, jlong block_address, jint column_position) { DB::Block * block = reinterpret_cast<DB::Block *>(block_address); return block->getByPosition(column_position); } static std::string jstring2string(JNIEnv * env, jstring string) { if (string == nullptr) return std::string(); const char * chars = env->GetStringUTFChars(string, nullptr); std::string ret(chars); env->ReleaseStringUTFChars(string, chars); return ret; } extern "C" { #endif namespace dbms { class LocalExecutor; } static jclass block_stripes_class; static jmethodID block_stripes_constructor; static jclass split_result_class; static jmethodID split_result_constructor; static jclass block_stats_class; static jmethodID block_stats_constructor; JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/) { JNIEnv * env; if (vm->GetEnv(reinterpret_cast<void **>(&env), JNI_VERSION_1_8) != JNI_OK) return JNI_ERR; local_engine::JniErrorsGlobalState::instance().initialize(env); block_stripes_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/spark/sql/execution/datasources/BlockStripes;"); block_stripes_constructor = local_engine::GetMethodID(env, block_stripes_class, "<init>", "(J[J[II)V"); split_result_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/CHSplitResult;"); split_result_constructor = local_engine::GetMethodID(env, split_result_class, "<init>", "(JJJJJJ[J[JJJJJJJ)V"); block_stats_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/BlockStats;"); block_stats_constructor = local_engine::GetMethodID(env, block_stats_class, "<init>", "(JZ)V"); local_engine::ShuffleReader::shuffle_input_stream_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/ShuffleInputStream;"); local_engine::NativeSplitter::iterator_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/IteratorWrapper;"); local_engine::WriteBufferFromJavaOutputStream::output_stream_class = local_engine::CreateGlobalClassReference(env, "Ljava/io/OutputStream;"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/execution/ColumnarNativeIterator;"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_hasNext = local_engine::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "hasNext", "()Z"); local_engine::SourceFromJavaIter::serialized_record_batch_iterator_next = local_engine::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "next", "()[B"); local_engine::ReadBufferFromJavaInputStream::input_stream_class = local_engine::CreateGlobalClassReference(env, "Ljava/io/InputStream;"); local_engine::ReadBufferFromJavaInputStream::input_stream_read = local_engine::GetMethodID(env, local_engine::ReadBufferFromJavaInputStream::input_stream_class, "read", "([B)I"); local_engine::ShuffleReader::shuffle_input_stream_read = local_engine::GetMethodID(env, local_engine::ShuffleReader::shuffle_input_stream_class, "read", "(JJ)J"); local_engine::NativeSplitter::iterator_has_next = local_engine::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "hasNext", "()Z"); local_engine::NativeSplitter::iterator_next = local_engine::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "next", "()J"); local_engine::WriteBufferFromJavaOutputStream::output_stream_write = local_engine::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "write", "([BII)V"); local_engine::WriteBufferFromJavaOutputStream::output_stream_flush = local_engine::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "flush", "()V"); local_engine::SparkRowToCHColumn::spark_row_interator_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/execution/SparkRowIterator;"); local_engine::SparkRowToCHColumn::spark_row_interator_hasNext = local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "hasNext", "()Z"); local_engine::SparkRowToCHColumn::spark_row_interator_next = local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "next", "()[B"); local_engine::SparkRowToCHColumn::spark_row_iterator_nextBatch = local_engine::GetMethodID( env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "nextBatch", "()Ljava/nio/ByteBuffer;"); local_engine::BroadCastJoinBuilder::init(env); local_engine::CacheManager::initJNI(env); local_engine::SparkMergeTreeWriterJNI::init(env); local_engine::SparkRowInfoJNI::init(env); local_engine::JNIUtils::vm = vm; return JNI_VERSION_1_8; } JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv * env, jclass, jbyteArray conf_plan) { LOCAL_ENGINE_JNI_METHOD_START const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, conf_plan); const std::string::size_type plan_buf_size = conf_plan_a.length(); local_engine::SparkConfigs::update( {reinterpret_cast<const char *>(conf_plan_a.elems()), plan_buf_size}, [&](const local_engine::SparkConfigs::ConfigMap & spark_conf_map) { local_engine::BackendInitializerUtil::initBackend(spark_conf_map); }, true); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeFinalizeNative(JNIEnv * env, jclass) { LOCAL_ENGINE_JNI_METHOD_START local_engine::BackendFinalizerUtil::finalizeSessionally(); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeDestroyNative(JNIEnv * env, jclass) { LOG_INFO(&Poco::Logger::get("jni"), "start destroy native"); local_engine::BackendFinalizerUtil::finalizeGlobally(); local_engine::JniErrorsGlobalState::instance().destroy(env); local_engine::BroadCastJoinBuilder::destroy(env); local_engine::SparkMergeTreeWriterJNI::destroy(env); local_engine::SparkRowInfoJNI::destroy(env); env->DeleteGlobalRef(block_stripes_class); env->DeleteGlobalRef(split_result_class); env->DeleteGlobalRef(block_stats_class); env->DeleteGlobalRef(local_engine::ShuffleReader::shuffle_input_stream_class); env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class); env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class); env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class); env->DeleteGlobalRef(local_engine::SparkRowToCHColumn::spark_row_interator_class); } /// Set settings for the current query. It assumes that all parameters are started with `CH_RUNTIME_SETTINGS_PREFIX` prefix, /// and the prefix is removed by java before passing to C++. JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_updateQueryRuntimeSettings(JNIEnv * env, jclass, jbyteArray settings) { LOCAL_ENGINE_JNI_METHOD_START const auto query_context = local_engine::QueryContext::instance().currentQueryContext(); const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, settings); const std::string::size_type conf_plan_size = conf_plan_a.length(); local_engine::updateSettings(query_context, {reinterpret_cast<const char *>(conf_plan_a.elems()), conf_plan_size}); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWithIterator( JNIEnv * env, jclass, jbyteArray plan, jobjectArray split_infos, jobjectArray iter_arr, jbyteArray conf_plan, jboolean materialize_input, jint partition_index) { LOCAL_ENGINE_JNI_METHOD_START auto query_context = local_engine::QueryContext::instance().currentQueryContext(); // by task update new configs ( in case of dynamic config update ) const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, conf_plan); const std::string::size_type conf_plan_size = conf_plan_a.length(); local_engine::SparkConfigs::updateConfig(query_context, {reinterpret_cast<const char *>(conf_plan_a.elems()), conf_plan_size}); const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan); const std::string::size_type plan_size = plan_a.length(); auto plan_pb = local_engine::BinaryToMessage<substrait::Plan>({reinterpret_cast<const char *>(plan_a.elems()), plan_size}); auto parser_context = local_engine::ParserContext::build(query_context, plan_pb, partition_index); local_engine::SerializedPlanParser parser(parser_context); jsize iter_num = env->GetArrayLength(iter_arr); for (jsize i = 0; i < iter_num; i++) { jobject iter = env->GetObjectArrayElement(iter_arr, i); iter = env->NewGlobalRef(iter); parser.addInputIter(iter, materialize_input); } for (jsize i = 0, split_info_arr_size = env->GetArrayLength(split_infos); i < split_info_arr_size; i++) { jbyteArray split_info = static_cast<jbyteArray>(env->GetObjectArrayElement(split_infos, i)); const auto split_info_a = local_engine::getByteArrayElementsSafe(env, split_info); const std::string::size_type split_info_size = split_info_a.length(); parser.addSplitInfo({reinterpret_cast<const char *>(split_info_a.elems()), split_info_size}); } local_engine::LocalExecutor * executor = parser.createExecutor(plan_pb).release(); LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast<uintptr_t>(executor)); executor->setMetric(parser.getMetric()); executor->setExtraPlanHolder(parser.extra_plan_holder); return reinterpret_cast<jlong>(executor); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } // Columnar Iterator JNIEXPORT jboolean Java_org_apache_gluten_vectorized_BatchIterator_nativeHasNext(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address); return executor->hasNext(); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address); DB::Block * column_batch = executor->nextColumnar(); return reinterpret_cast<UInt64>(column_batch); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START auto * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address); executor->cancel(); LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast<uintptr_t>(executor)); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START auto * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address); LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast<intptr_t>(executor)); local_engine::LocalExecutor::resetCurrentExecutor(); delete executor; LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jstring Java_org_apache_gluten_vectorized_BatchIterator_nativeFetchMetrics(JNIEnv * env, jobject /*obj*/, jlong executor_address) { LOCAL_ENGINE_JNI_METHOD_START /// Collect metrics only if optimizations are disabled, otherwise coredump would happen. const local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address); const auto metric = executor->getMetric(); const String metrics_json = metric ? local_engine::RelMetricSerializer::serializeRelMetric(metric) : ""; return local_engine::charTojstring(env, metrics_json.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT jboolean Java_org_apache_gluten_vectorized_CHColumnVector_nativeHasNull(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START DB::Block * block = reinterpret_cast<DB::Block *>(block_address); auto col = getColumnFromColumnVector(env, obj, block_address, column_position); if (!col.column->isNullable()) { return false; } else { const auto * nullable = checkAndGetColumn<DB::ColumnNullable>(&*col.column); const auto & null_map_data = nullable->getNullMapData(); return !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size()); } LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jint Java_org_apache_gluten_vectorized_CHColumnVector_nativeNumNulls(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); if (!col.column->isNullable()) { return 0; } else { const auto * nullable = checkAndGetColumn<DB::ColumnNullable>(&*col.column); return std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0); } LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jboolean Java_org_apache_gluten_vectorized_CHColumnVector_nativeIsNullAt( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); return col.column->isNullAt(row_id); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jboolean Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetBoolean( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); return nested_col->getBool(row_id); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jbyte Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetByte( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); return reinterpret_cast<const jbyte *>(nested_col->getDataAt(row_id).data)[0]; LOCAL_ENGINE_JNI_METHOD_END(env, 0) } JNIEXPORT jshort Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetShort( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); return reinterpret_cast<const jshort *>(nested_col->getDataAt(row_id).data)[0]; LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jint Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetInt( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); if (col.type->getTypeId() == DB::TypeIndex::Date) return nested_col->getUInt(row_id); else return nested_col->getInt(row_id); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetLong( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); return nested_col->getInt(row_id); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jfloat Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetFloat( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); return nested_col->getFloat32(row_id); LOCAL_ENGINE_JNI_METHOD_END(env, 0.0) } JNIEXPORT jdouble Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetDouble( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); return nested_col->getFloat64(row_id); LOCAL_ENGINE_JNI_METHOD_END(env, 0.0) } JNIEXPORT jstring Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetString( JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START auto col = getColumnFromColumnVector(env, obj, block_address, column_position); DB::ColumnPtr nested_col = col.column; if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get())) nested_col = nullable_col->getNestedColumnPtr(); const auto * string_col = checkAndGetColumn<DB::ColumnString>(nested_col.get()); auto result = string_col->getDataAt(row_id); return local_engine::charTojstring(env, result.toString().c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, local_engine::charTojstring(env, "")) } // native block JNIEXPORT void Java_org_apache_gluten_vectorized_CHNativeBlock_nativeClose(JNIEnv * /*env*/, jobject /*obj*/, jlong /*block_address*/) { } JNIEXPORT jint Java_org_apache_gluten_vectorized_CHNativeBlock_nativeNumRows(JNIEnv * env, jobject /*obj*/, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START DB::Block * block = reinterpret_cast<DB::Block *>(block_address); return block->rows(); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jint Java_org_apache_gluten_vectorized_CHNativeBlock_nativeNumColumns(JNIEnv * env, jobject /*obj*/, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START auto * block = reinterpret_cast<DB::Block *>(block_address); return block->columns(); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jbyteArray Java_org_apache_gluten_vectorized_CHNativeBlock_nativeColumnType(JNIEnv * env, jobject /*obj*/, jlong block_address, jint position) { LOCAL_ENGINE_JNI_METHOD_START auto * block = reinterpret_cast<DB::Block *>(block_address); const auto & col = block->getByPosition(position); std::string substrait_type; dbms::SerializedPlanBuilder::buildType(col.type, substrait_type); return local_engine::stringTojbyteArray(env, substrait_type); LOCAL_ENGINE_JNI_METHOD_END(env, local_engine::stringTojbyteArray(env, "")) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHNativeBlock_nativeTotalBytes(JNIEnv * env, jobject /*obj*/, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START auto * block = reinterpret_cast<DB::Block *>(block_address); return block->bytes(); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockStats(JNIEnv * env, jobject obj, jlong block_address, jint column_position) { LOCAL_ENGINE_JNI_METHOD_START DB::Block * block = reinterpret_cast<DB::Block *>(block_address); auto col = getColumnFromColumnVector(env, obj, block_address, column_position); if (!col.column->isNullable()) { jobject block_stats = env->NewObject(block_stats_class, block_stats_constructor, block->rows(), false); return block_stats; } else { const auto * nullable = checkAndGetColumn<DB::ColumnNullable>(&*col.column); const auto & null_map_data = nullable->getNullMapData(); jobject block_stats = env->NewObject( block_stats_class, block_stats_constructor, block->rows(), !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size())); return block_stats; } LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHNativeBlock_copyBlock(JNIEnv * env, jobject obj, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START DB::Block * block = reinterpret_cast<DB::Block *>(block_address); auto copied_block = block->cloneWithColumns(block->getColumns()); auto * a = new DB::Block(std::move(copied_block)); return reinterpret_cast<jlong>(a); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHNativeBlock_nativeSlice(JNIEnv * env, jobject /* obj */, jlong block_address, jint offset, jint limit) { LOCAL_ENGINE_JNI_METHOD_START DB::Block * block = reinterpret_cast<DB::Block *>(block_address); DB::Block cut_block = block->cloneWithCutColumns(offset, limit); return reinterpret_cast<jlong>(new DB::Block(std::move(cut_block))); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHStreamReader_createNativeShuffleReader( JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed, jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes) { LOCAL_ENGINE_JNI_METHOD_START auto * input = env->NewGlobalRef(input_stream); auto read_buffer = std::make_unique<local_engine::ReadBufferFromJavaShuffleInputStream>(input); auto * shuffle_reader = new local_engine::ShuffleReader(std::move(read_buffer), compressed, max_shuffle_read_rows, max_shuffle_read_bytes); return reinterpret_cast<jlong>(shuffle_reader); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHStreamReader_nativeNext(JNIEnv * env, jobject /*obj*/, jlong shuffle_reader) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleReader * reader = reinterpret_cast<local_engine::ShuffleReader *>(shuffle_reader); DB::Block * block = reader->read(); return reinterpret_cast<jlong>(block); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHStreamReader_directRead( JNIEnv * env, jclass /*clazz*/, jobject input_stream, jbyteArray buffer, jint buffer_size) { LOCAL_ENGINE_JNI_METHOD_START // auto * input = env->NewGlobalRef(input_stream); auto rb = std::make_unique<local_engine::ReadBufferFromJavaInputStream>(input_stream, buffer, buffer_size); auto reader = std::make_unique<local_engine::NativeReader>(*rb); DB::Block block = reader->read(); DB::Block * res = new DB::Block(block); return reinterpret_cast<jlong>(res); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_org_apache_gluten_vectorized_CHStreamReader_nativeClose(JNIEnv * env, jobject /*obj*/, jlong shuffle_reader) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleReader * reader = reinterpret_cast<local_engine::ShuffleReader *>(shuffle_reader); delete reader; LOCAL_ENGINE_JNI_METHOD_END(env, ) } local_engine::SplitterHolder * buildAndExecuteShuffle( JNIEnv * env, jobject iter, const String & name, const local_engine::SplitOptions & options, jobject rss_pusher = nullptr) { auto current_executor = local_engine::LocalExecutor::getCurrentExecutor(); local_engine::SplitterHolder * splitter = nullptr; // There are two modes of fallback, one is full fallback but uses columnar shuffle, // and the other is partial fallback that creates one or more LocalExecutor. // In full fallback, the current executor does not exist. if (!current_executor.has_value() || current_executor.value()->fallbackMode()) { auto first_block = local_engine::SourceFromJavaIter::peekBlock(env, iter); if (first_block.has_value()) { /// Try to decide header from the first block read from Java iterator. auto header = first_block.value().cloneEmpty(); splitter = new local_engine::SplitterHolder{ .exchange_manager = std::make_unique<local_engine::SparkExchangeManager>(header, name, options, rss_pusher)}; splitter->exchange_manager->initSinks(1); splitter->exchange_manager->pushBlock(first_block.value()); first_block = std::nullopt; // in fallback mode, spark's whole stage code gen operator uses TaskContext and needs to be executed in the task thread. while (auto block = local_engine::SourceFromJavaIter::peekBlock(env, iter)) splitter->exchange_manager->pushBlock(block.value()); } else // empty iterator splitter = new local_engine::SplitterHolder{ .exchange_manager = std::make_unique<local_engine::SparkExchangeManager>(DB::Block(), name, options, rss_pusher)}; } else { splitter = new local_engine::SplitterHolder{ .exchange_manager = std::make_unique<local_engine::SparkExchangeManager>( current_executor.value()->getHeader().cloneEmpty(), name, options, rss_pusher)}; // TODO support multiple sinks splitter->exchange_manager->initSinks(1); current_executor.value()->setSinks([&](auto & pipeline_builder) { splitter->exchange_manager->setSinksToPipeline(pipeline_builder); }); // execute pipeline current_executor.value()->execute(); } return splitter; } // Splitter Jni Wrapper JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_nativeMake( JNIEnv * env, jobject, jobject iter, jstring short_name, jint num_partitions, jbyteArray expr_list, jbyteArray out_expr_list, jint shuffle_id, jlong map_id, jint split_size, jstring codec, jint compress_level, jstring data_file, jstring local_dirs, jint num_sub_dirs, jlong spill_threshold, jstring hash_algorithm, jlong max_sort_buffer_size, jboolean force_memory_sort) { LOCAL_ENGINE_JNI_METHOD_START std::string hash_exprs; std::string out_exprs; if (expr_list != nullptr) { const auto expr_list_a = local_engine::getByteArrayElementsSafe(env, expr_list); const std::string::size_type expr_list_size = expr_list_a.length(); hash_exprs = std::string{reinterpret_cast<const char *>(expr_list_a.elems()), expr_list_size}; } if (out_expr_list != nullptr) { const auto out_expr_list_a = local_engine::getByteArrayElementsSafe(env, out_expr_list); const std::string::size_type out_expr_list_size = out_expr_list_a.length(); out_exprs = std::string{reinterpret_cast<const char *>(out_expr_list_a.elems()), out_expr_list_size}; } Poco::StringTokenizer local_dirs_tokenizer(jstring2string(env, local_dirs), ","); std::vector<std::string> local_dirs_list; local_dirs_list.insert(local_dirs_list.end(), local_dirs_tokenizer.begin(), local_dirs_tokenizer.end()); local_engine::SplitOptions options{ .split_size = static_cast<size_t>(split_size), .io_buffer_size = DB::DBMS_DEFAULT_BUFFER_SIZE, .data_file = jstring2string(env, data_file), .local_dirs_list = std::move(local_dirs_list), .num_sub_dirs = num_sub_dirs, .shuffle_id = shuffle_id, .map_id = static_cast<int>(map_id), .partition_num = static_cast<size_t>(num_partitions), .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), .compress_level = compress_level < 0 ? std::nullopt : std::optional<int>(compress_level), .spill_threshold = static_cast<size_t>(spill_threshold), .hash_algorithm = jstring2string(env, hash_algorithm), .max_sort_buffer_size = static_cast<size_t>(max_sort_buffer_size), .force_memory_sort = static_cast<bool>(force_memory_sort)}; auto name = jstring2string(env, short_name); return reinterpret_cast<jlong>(buildAndExecuteShuffle(env, iter, name, options)); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_nativeMakeForRSS( JNIEnv * env, jobject, jobject iter, jstring short_name, jint num_partitions, jbyteArray expr_list, jbyteArray out_expr_list, jint shuffle_id, jlong map_id, jint split_size, jstring codec, jint compress_level, jlong spill_threshold, jstring hash_algorithm, jobject pusher, jboolean force_memory_sort) { LOCAL_ENGINE_JNI_METHOD_START std::string hash_exprs; std::string out_exprs; if (expr_list != nullptr) { const auto expr_list_a = local_engine::getByteArrayElementsSafe(env, expr_list); const std::string::size_type expr_list_size = expr_list_a.length(); hash_exprs = std::string{reinterpret_cast<const char *>(expr_list_a.elems()), expr_list_size}; } if (out_expr_list != nullptr) { const auto out_expr_list_a = local_engine::getByteArrayElementsSafe(env, out_expr_list); const std::string::size_type out_expr_list_size = out_expr_list_a.length(); out_exprs = std::string{reinterpret_cast<const char *>(out_expr_list_a.elems()), out_expr_list_size}; } local_engine::SplitOptions options{ .split_size = static_cast<size_t>(split_size), .io_buffer_size = DB::DBMS_DEFAULT_BUFFER_SIZE, .shuffle_id = shuffle_id, .map_id = static_cast<int>(map_id), .partition_num = static_cast<size_t>(num_partitions), .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), .compress_level = compress_level < 0 ? std::nullopt : std::optional<int>(compress_level), .spill_threshold = static_cast<size_t>(spill_threshold), .hash_algorithm = jstring2string(env, hash_algorithm), .force_memory_sort = static_cast<bool>(force_memory_sort)}; auto name = jstring2string(env, short_name); return reinterpret_cast<jlong>(buildAndExecuteShuffle(env, iter, name, options, pusher)); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_stop(JNIEnv * env, jobject, jlong splitterId) { LOCAL_ENGINE_JNI_METHOD_START local_engine::SplitterHolder * splitter = reinterpret_cast<local_engine::SplitterHolder *>(splitterId); splitter->exchange_manager->finish(); auto result = splitter->exchange_manager->getSplitResult(); const auto & partition_lengths = result.partition_lengths; auto * partition_length_arr = env->NewLongArray(partition_lengths.size()); const auto * src = reinterpret_cast<const jlong *>(partition_lengths.data()); env->SetLongArrayRegion(partition_length_arr, 0, partition_lengths.size(), src); const auto & raw_partition_lengths = result.raw_partition_lengths; auto * raw_partition_length_arr = env->NewLongArray(raw_partition_lengths.size()); const auto * raw_src = reinterpret_cast<const jlong *>(raw_partition_lengths.data()); env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src); // AQE has dependency on total_bytes_written, if the data is wrong, it will generate inappropriate plan // add a log here for remining this. if (result.total_rows && !result.total_bytes_written) LOG_WARNING(getLogger("CHShuffleSplitterJniWrapper"), "total_bytes_written is 0, something may be wrong"); jobject split_result = env->NewObject( split_result_class, split_result_constructor, result.total_compute_pid_time, result.total_write_time, result.total_spill_time, result.total_compress_time, result.total_bytes_written, result.total_bytes_spilled, partition_length_arr, raw_partition_length_arr, result.total_split_time, result.total_io_time, result.total_serialize_time, result.total_rows, result.total_blocks, result.wall_time); return split_result; LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT void Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_close(JNIEnv * env, jobject, jlong splitterId) { LOCAL_ENGINE_JNI_METHOD_START local_engine::SplitterHolder * splitter = reinterpret_cast<local_engine::SplitterHolder *>(splitterId); delete splitter; LOCAL_ENGINE_JNI_METHOD_END(env, ) } // CHBlockConverterJniWrapper JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_convertColumnarToRow( JNIEnv * env, jclass, jlong block_address, jintArray masks) { LOCAL_ENGINE_JNI_METHOD_START local_engine::MaskVector mask = nullptr; DB::Block * block = reinterpret_cast<DB::Block *>(block_address); if (masks != nullptr) { auto safeArray = local_engine::getIntArrayElementsSafe(env, masks); mask = std::make_unique<std::vector<size_t>>(); for (int j = 0; j < safeArray.length(); j++) mask->push_back(safeArray.elems()[j]); } local_engine::CHColumnToSparkRow converter; std::unique_ptr<local_engine::SparkRowInfo> spark_row_info = converter.convertCHColumnToSparkRow(*block, mask); return local_engine::SparkRowInfoJNI::create(env, *spark_row_info); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT void Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_freeMemory(JNIEnv * env, jclass, jlong address, jlong size) { LOCAL_ENGINE_JNI_METHOD_START local_engine::CHColumnToSparkRow converter; converter.freeMem(reinterpret_cast<char *>(address), size); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_convertSparkRowsToCHColumn( JNIEnv * env, jclass, jobject java_iter, jobjectArray names, jobjectArray types) { LOCAL_ENGINE_JNI_METHOD_START using namespace std; int num_columns = env->GetArrayLength(names); vector<string> c_names; vector<string> c_types; c_names.reserve(num_columns); for (int i = 0; i < num_columns; i++) { auto * name = static_cast<jstring>(env->GetObjectArrayElement(names, i)); c_names.emplace_back(jstring2string(env, name)); auto * type = static_cast<jbyteArray>(env->GetObjectArrayElement(types, i)); auto type_length = env->GetArrayLength(type); jbyte * type_ptr = env->GetByteArrayElements(type, nullptr); string str_type(reinterpret_cast<const char *>(type_ptr), type_length); c_types.emplace_back(std::move(str_type)); env->ReleaseByteArrayElements(type, type_ptr, JNI_ABORT); env->DeleteLocalRef(name); env->DeleteLocalRef(type); } auto * block = local_engine::SparkRowToCHColumn::convertSparkRowItrToCHColumn(java_iter, c_names, c_types); return reinterpret_cast<jlong>(block); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_freeBlock(JNIEnv * env, jclass, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::SparkRowToCHColumn::freeBlock(reinterpret_cast<DB::Block *>(block_address)); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeCreateInstance(JNIEnv * env, jobject) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = new local_engine::NativeWriterInMemory(); return reinterpret_cast<jlong>(writer); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance); auto * block = reinterpret_cast<DB::Block *>(block_address); writer->write(*block); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jint Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeResultSize(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance); return static_cast<jint>(writer->collect().size()); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeCollect(JNIEnv * env, jobject, jlong instance, jbyteArray result) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance); auto data = writer->collect(); env->SetByteArrayRegion(result, 0, data.size(), reinterpret_cast<const jbyte *>(data.data())); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeClose(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance); delete writer; LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createFilerWriter( JNIEnv * env, jobject, jstring file_uri_, jbyteArray writeRel) { LOCAL_ENGINE_JNI_METHOD_START const auto writeRelBytes = local_engine::getByteArrayElementsSafe(env, writeRel); substrait::WriteRel write_rel = local_engine::BinaryToMessage<substrait::WriteRel>( {reinterpret_cast<const char *>(writeRelBytes.elems()), static_cast<size_t>(writeRelBytes.length())}); assert(write_rel.has_named_table()); const substrait::NamedObjectWrite & named_table = write_rel.named_table(); local_engine::Write write_opt; named_table.advanced_extension().optimization().UnpackTo(&write_opt); DB::Block preferred_schema = local_engine::TypeParser::buildBlockFromNamedStructWithoutDFS(write_rel.table_schema()); const auto file_uri = jstring2string(env, file_uri_); // for HiveFileFormat, the file url may not end with .parquet, so we pass in the format as a hint const auto context = local_engine::QueryContext::instance().currentQueryContext(); auto * writer = local_engine::NormalFileWriter::create(context, file_uri, preferred_schema, write_opt.common().format()).release(); return reinterpret_cast<jlong>(writer); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createMergeTreeWriter( JNIEnv * env, jobject, jbyteArray writeRel, jbyteArray conf_plan) { LOCAL_ENGINE_JNI_METHOD_START auto query_context = local_engine::QueryContext::instance().currentQueryContext(); // by task update new configs (in case of dynamic config update) const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, conf_plan); local_engine::SparkConfigs::updateConfig( query_context, {reinterpret_cast<const char *>(conf_plan_a.elems()), static_cast<size_t>(conf_plan_a.length())}); const auto writeRelBytes = local_engine::getByteArrayElementsSafe(env, writeRel); substrait::WriteRel write_rel = local_engine::BinaryToMessage<substrait::WriteRel>( {reinterpret_cast<const char *>(writeRelBytes.elems()), static_cast<size_t>(writeRelBytes.length())}); assert(write_rel.has_named_table()); const substrait::NamedObjectWrite & named_table = write_rel.named_table(); local_engine::Write write; if (!named_table.advanced_extension().optimization().UnpackTo(&write)) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Failed to unpack write optimization with local_engine::Write."); assert(write.has_common()); assert(write.has_mergetree()); local_engine::MergeTreeTable merge_tree_table(write, write_rel.table_schema()); const std::string & id = write.common().job_task_attempt_id(); return reinterpret_cast<jlong>(local_engine::SparkMergeTreeWriter::create(merge_tree_table, query_context, id).release()); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_filterRangesOnDriver( JNIEnv * env, jclass, jbyteArray plan_, jbyteArray read_) { LOCAL_ENGINE_JNI_METHOD_START const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_); auto plan_pb = local_engine::BinaryToMessage<substrait::Plan>( {reinterpret_cast<const char *>(plan_a.elems()), static_cast<size_t>(plan_a.length())}); auto parser_context = local_engine::ParserContext::build(local_engine::QueryContext::globalContext(), plan_pb); local_engine::SerializedPlanParser parser(parser_context); const auto read_a = local_engine::getByteArrayElementsSafe(env, read_); auto read_pb = local_engine::BinaryToMessage<substrait::Rel>( {reinterpret_cast<const char *>(read_a.elems()), static_cast<size_t>(read_a.length())}); local_engine::MergeTreeRelParser mergeTreeParser(parser_context, local_engine::QueryContext::globalContext()); auto res = mergeTreeParser.filterRangesOnDriver(read_pb.read()); return local_engine::charTojstring(env, res.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_write(JNIEnv * env, jobject, jlong instanceId, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::NativeOutputWriter *>(instanceId); auto * block = reinterpret_cast<DB::Block *>(block_address); writer->write(*block); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_close(JNIEnv * env, jobject, jlong instanceId) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::NativeOutputWriter *>(instanceId); SCOPE_EXIT({ delete writer; }); writer->close(); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeMergeMTParts( JNIEnv * env, jclass, jbyteArray split_info_, jstring partition_dir_, jstring bucket_dir_) { LOCAL_ENGINE_JNI_METHOD_START const auto uuid_str = toString(DB::UUIDHelpers::generateV4()); const auto partition_dir = jstring2string(env, partition_dir_); const auto bucket_dir = jstring2string(env, bucket_dir_); const auto split_info_a = local_engine::getByteArrayElementsSafe(env, split_info_); auto extension_table = local_engine::BinaryToMessage<substrait::ReadRel::ExtensionTable>( {reinterpret_cast<const char *>(split_info_a.elems()), static_cast<size_t>(split_info_a.length())}); local_engine::MergeTreeTableInstance merge_tree_table(extension_table); auto context = local_engine::QueryContext::instance().currentQueryContext(); // each task, using its own CustomStorageMergeTree, doesn't reuse auto temp_storage = merge_tree_table.copyToVirtualStorage(context); // prefetch all needed parts metadata before merge local_engine::restoreMetaData(temp_storage, merge_tree_table, *context); // to release temp CustomStorageMergeTree with RAII DB::StorageID storage_id = temp_storage->getStorageID(); SCOPE_EXIT({ local_engine::StorageMergeTreeFactory::freeStorage(storage_id); }); std::vector<DB::DataPartPtr> selected_parts = local_engine::StorageMergeTreeFactory::getDataPartsByNames(temp_storage->getStorageID(), "", merge_tree_table.getPartNames()); DB::MergeTreeDataPartPtr loaded = local_engine::mergeParts(selected_parts, uuid_str, *temp_storage, partition_dir, bucket_dir); std::vector<local_engine::PartInfo> res; saveFileStatus(*temp_storage, context, loaded->name, const_cast<DB::IDataPartStorage &>(loaded->getDataPartStorage())); res.emplace_back(local_engine::PartInfo{ loaded->name, loaded->getMarksCount(), loaded->getBytesOnDisk(), loaded->rows_count, partition_dir, bucket_dir}); auto json_info = local_engine::PartInfo::toJson(res); return local_engine::charTojstring(env, json_info.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT jobject Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_splitBlockByPartitionAndBucket( JNIEnv * env, jclass, jlong blockAddress, jintArray partitionColIndice, jboolean hasBucket) { LOCAL_ENGINE_JNI_METHOD_START auto * block = reinterpret_cast<DB::Block *>(blockAddress); auto safeArray = local_engine::getIntArrayElementsSafe(env, partitionColIndice); std::vector<size_t> partition_col_indice_vec; for (int i = 0; i < safeArray.length(); ++i) partition_col_indice_vec.push_back(safeArray.elems()[i]); auto query_context = local_engine::QueryContext::instance().currentQueryContext(); const DB::Settings & settings = query_context->getSettingsRef(); bool reserve_ = local_engine::settingsEqual(settings, "gluten.write.reserve_partition_columns", "true"); local_engine::BlockStripes bs = local_engine::BlockStripeSplitter::split(*block, partition_col_indice_vec, hasBucket, reserve_); auto * addresses = env->NewLongArray(bs.block_addresses.size()); env->SetLongArrayRegion(addresses, 0, bs.block_addresses.size(), reinterpret_cast<const jlong *>(bs.block_addresses.data())); auto * indices = env->NewIntArray(bs.heading_row_indice.size()); env->SetIntArrayRegion(indices, 0, bs.heading_row_indice.size(), bs.heading_row_indice.data()); jobject block_stripes = env->NewObject( block_stripes_class, block_stripes_constructor, bs.origin_block_address, addresses, indices, bs.origin_block_num_columns); return block_stripes; LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild( JNIEnv * env, jclass, jstring key, jbyteArray in, jlong row_count_, jstring join_key_, jint join_type_, jboolean has_mixed_join_condition, jboolean is_existence_join, jbyteArray named_struct, jboolean is_null_aware_anti_join, jboolean has_null_key_values) { LOCAL_ENGINE_JNI_METHOD_START const auto hash_table_id = jstring2string(env, key); const auto join_key = jstring2string(env, join_key_); const auto named_struct_a = local_engine::getByteArrayElementsSafe(env, named_struct); const std::string::size_type struct_size = named_struct_a.length(); std::string struct_string{reinterpret_cast<const char *>(named_struct_a.elems()), struct_size}; const jsize length = env->GetArrayLength(in); local_engine::ReadBufferFromByteArray read_buffer_from_java_array(in, length); DB::CompressedReadBuffer input(read_buffer_from_java_array); local_engine::configureCompressedReadBuffer(input); const auto * obj = make_wrapper(local_engine::BroadCastJoinBuilder::buildJoin( hash_table_id, input, row_count_, join_key, join_type_, has_mixed_join_condition, is_existence_join, struct_string, is_null_aware_anti_join, has_null_key_values)); return obj->instance(); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeCloneBuildHashTable(JNIEnv * env, jclass, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START auto * cloned = local_engine::make_wrapper(local_engine::SharedPointerWrapper<local_engine::StorageJoinFromReadBuffer>::sharedPtr(instance)); return cloned->instance(); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } JNIEXPORT void Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeCleanBuildHashTable(JNIEnv * env, jclass, jstring hash_table_id_, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START auto hash_table_id = jstring2string(env, hash_table_id_); local_engine::BroadCastJoinBuilder::cleanBuildHashTable(hash_table_id, instance); LOCAL_ENGINE_JNI_METHOD_END(env, ) } // BlockSplitIterator JNIEXPORT jlong Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeCreate( JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jstring schema, jint partition_num, jint buffer_size, jstring hash_algorithm) { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Options options; options.partition_num = partition_num; options.buffer_size = buffer_size; auto hash_algorithm_str = jstring2string(env, hash_algorithm); options.hash_algorithm.swap(hash_algorithm_str); auto expr_str = jstring2string(env, expr); std::string schema_str; if (schema) schema_str = jstring2string(env, schema); options.exprs_buffer.swap(expr_str); options.schema_buffer.swap(schema_str); local_engine::NativeSplitter::Holder * splitter = new local_engine::NativeSplitter::Holder{ .splitter = local_engine::NativeSplitter::create(jstring2string(env, name), options, in)}; return reinterpret_cast<jlong>(splitter); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeClose(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance); delete splitter; LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jboolean Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeHasNext(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance); return splitter->splitter->hasNext(); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeNext(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance); return reinterpret_cast<jlong>(splitter->splitter->next()); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jint Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeNextPartitionId(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance); return reinterpret_cast<jint>(splitter->splitter->nextPartitionId()); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_BlockOutputStream_nativeCreate( JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring codec, jint level, jboolean compressed, jint customize_buffer_size) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer = new local_engine::ShuffleWriter(output_stream, buffer, jstring2string(env, codec), level, compressed, customize_buffer_size); return reinterpret_cast<jlong>(writer); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT long Java_org_apache_gluten_vectorized_BlockOutputStream_directWrite( JNIEnv * env, jclass, jobject output_stream, jbyteArray buffer, jint customize_buffer_size, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START DB::Block * block = reinterpret_cast<DB::Block *>(block_address); auto wb = std::make_shared<local_engine::WriteBufferFromJavaOutputStream>(output_stream, buffer, customize_buffer_size); auto native_writer = std::make_unique<local_engine::NativeWriter>(*wb, block->cloneEmpty()); auto write_size = native_writer->write(*block); native_writer->flush(); wb->finalize(); return write_size; LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_org_apache_gluten_vectorized_BlockOutputStream_nativeClose(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast<local_engine::ShuffleWriter *>(instance); writer->flush(); delete writer; LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_org_apache_gluten_vectorized_BlockOutputStream_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast<local_engine::ShuffleWriter *>(instance); DB::Block * block = reinterpret_cast<DB::Block *>(block_address); writer->write(*block); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT void Java_org_apache_gluten_vectorized_BlockOutputStream_nativeFlush(JNIEnv * env, jobject, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer = reinterpret_cast<local_engine::ShuffleWriter *>(instance); writer->flush(); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIEnv * env, jclass, jobject input, jbyteArray plan) { LOCAL_ENGINE_JNI_METHOD_START const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan); const std::string::size_type plan_size = plan_a.length(); auto plan_pb = local_engine::BinaryToMessage<substrait::Plan>({reinterpret_cast<const char *>(plan_a.elems()), plan_size}); auto parser_context = local_engine::ParserContext::build(local_engine::QueryContext::globalContext(), plan_pb); local_engine::SerializedPlanParser parser(parser_context); const jobject iter = env->NewGlobalRef(input); parser.addInputIter(iter, false); local_engine::LocalExecutor * executor = parser.createExecutor(plan_pb).release(); return reinterpret_cast<jlong>(executor); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT void Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance); delete executor; LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jboolean Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeHasNext(JNIEnv * env, jclass, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance); return executor->hasNext(); LOCAL_ENGINE_JNI_METHOD_END(env, false) } JNIEXPORT jlong Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeNext(JNIEnv * env, jclass, jlong instance) { LOCAL_ENGINE_JNI_METHOD_START local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance); return reinterpret_cast<jlong>(executor->nextColumnar()); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } JNIEXPORT jlong Java_org_apache_gluten_memory_CHThreadGroup_createThreadGroup(JNIEnv * env, jclass, jstring task_id_) { LOCAL_ENGINE_JNI_METHOD_START auto task_id = jstring2string(env, task_id_); return local_engine::QueryContext::instance().initializeQuery(task_id); LOCAL_ENGINE_JNI_METHOD_END(env, 0l) } JNIEXPORT jlong Java_org_apache_gluten_memory_CHThreadGroup_threadGroupPeakMemory(JNIEnv * env, jclass, jlong id) { LOCAL_ENGINE_JNI_METHOD_START return local_engine::QueryContext::instance().currentPeakMemory(id); LOCAL_ENGINE_JNI_METHOD_END(env, 0l) } JNIEXPORT void Java_org_apache_gluten_memory_CHThreadGroup_releaseThreadGroup(JNIEnv * env, jclass, jlong id) { LOCAL_ENGINE_JNI_METHOD_START local_engine::QueryContext::instance().finalizeQuery(id); LOCAL_ENGINE_JNI_METHOD_END(env, ) } // only for UT GlutenClickHouseNativeExceptionSuite JNIEXPORT void Java_org_apache_gluten_utils_TestExceptionUtils_generateNativeException(JNIEnv * env) { LOCAL_ENGINE_JNI_METHOD_START throw DB::Exception(DB::ErrorCodes::UNKNOWN_EXCEPTION, "test native exception"); LOCAL_ENGINE_JNI_METHOD_END(env, ) } JNIEXPORT jstring Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts( JNIEnv * env, jobject, jstring table_, jstring columns_, jboolean only_meta_cache_) { LOCAL_ENGINE_JNI_METHOD_START auto table_def = jstring2string(env, table_); auto columns = jstring2string(env, columns_); Poco::StringTokenizer tokenizer(columns, ","); std::unordered_set<String> column_set; for (const auto & col : tokenizer) column_set.insert(col); local_engine::MergeTreeTableInstance table(table_def); auto id = local_engine::CacheManager::instance().cacheParts(table, column_set, only_meta_cache_); return local_engine::charTojstring(env, id.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr); } JNIEXPORT jobject Java_org_apache_gluten_execution_CHNativeCacheManager_nativeGetCacheStatus(JNIEnv * env, jobject, jstring id) { LOCAL_ENGINE_JNI_METHOD_START return local_engine::CacheManager::instance().getCacheStatus(env, jstring2string(env, id)); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr); } JNIEXPORT jstring Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheFiles(JNIEnv * env, jobject, jbyteArray files) { LOCAL_ENGINE_JNI_METHOD_START const auto files_bytes = local_engine::getByteArrayElementsSafe(env, files); const std::string::size_type files_bytes_size = files_bytes.length(); std::string_view files_view = {reinterpret_cast<const char *>(files_bytes.elems()), files_bytes_size}; substrait::ReadRel::LocalFiles local_files = local_engine::BinaryToMessage<substrait::ReadRel::LocalFiles>(files_view); auto jobId = local_engine::CacheManager::instance().cacheFiles(local_files); return local_engine::charTojstring(env, jobId.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr); } JNIEXPORT void Java_org_apache_gluten_execution_CHNativeCacheManager_removeFiles(JNIEnv * env, jobject, jstring file_, jstring cache_name_) { LOCAL_ENGINE_JNI_METHOD_START auto file = jstring2string(env, file_); auto cache_name = jstring2string(env, cache_name_); local_engine::CacheManager::removeFiles(file, cache_name); LOCAL_ENGINE_JNI_METHOD_END(env, ); } JNIEXPORT jlong Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_createDeletionVectorWriter( JNIEnv * env, jclass, jstring table_path_, jint prefix_length_, jlong packingTargetSize_) { LOCAL_ENGINE_JNI_METHOD_START auto table_path = jstring2string(env, table_path_); const auto query_context = local_engine::QueryContext::instance().currentQueryContext(); auto writer = new local_engine::delta::DeltaWriter(query_context, table_path, prefix_length_, packingTargetSize_); return reinterpret_cast<jlong>(writer); LOCAL_ENGINE_JNI_METHOD_END(env, -1); } JNIEXPORT void Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_deletionVectorWrite( JNIEnv * env, jclass, jlong writer_address_, jlong blockAddress) { LOCAL_ENGINE_JNI_METHOD_START const auto * block = reinterpret_cast<DB::Block *>(blockAddress); auto * writer = reinterpret_cast<local_engine::delta::DeltaWriter *>(writer_address_); writer->writeDeletionVector(*block); LOCAL_ENGINE_JNI_METHOD_END(env, ); } JNIEXPORT jlong Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_deletionVectorWriteFinalize(JNIEnv * env, jclass, jlong writer_address_) { LOCAL_ENGINE_JNI_METHOD_START auto * writer = reinterpret_cast<local_engine::delta::DeltaWriter *>(writer_address_); auto * column_batch = writer->finalize(); delete writer; return reinterpret_cast<UInt64>(column_batch); LOCAL_ENGINE_JNI_METHOD_END(env, -1); } #ifdef __cplusplus } #endif