cpp-ch/local-engine/local_engine_jni.cpp (1,213 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <numeric>
#include <string>
#include <jni.h>
#include <Builder/SerializedPlanBuilder.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataTypes/DataTypeNullable.h>
#include <Join/BroadCastJoinBuilder.h>
#include <Parser/CHColumnToSparkRow.h>
#include <Parser/LocalExecutor.h>
#include <Parser/ParserContext.h>
#include <Parser/RelParsers/MergeTreeRelParser.h>
#include <Parser/RelParsers/RelParser.h>
#include <Parser/RelParsers/WriteRelParser.h>
#include <Parser/SerializedPlanParser.h>
#include <Parser/SparkRowToCHColumn.h>
#include <Parser/SubstraitParserUtils.h>
#include <Shuffle/NativeSplitter.h>
#include <Shuffle/NativeWriterInMemory.h>
#include <Shuffle/PartitionWriter.h>
#include <Shuffle/ShuffleCommon.h>
#include <Shuffle/ShuffleReader.h>
#include <Shuffle/ShuffleWriter.h>
#include <Shuffle/SparkExchangeSink.h>
#include <Shuffle/WriteBufferFromJavaOutputStream.h>
#include <Storages/Cache/CacheManager.h>
#include <Storages/MergeTree/MetaDataHelper.h>
#include <Storages/MergeTree/SparkMergeTreeWriteSettings.h>
#include <Storages/MergeTree/SparkMergeTreeWriter.h>
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
#include <Storages/Output/BlockStripeSplitter.h>
#include <Storages/Output/NormalFileWriter.h>
#include <Storages/SubstraitSource/Delta/DeltaWriter.h>
#include <jni/SharedPointerWrapper.h>
#include <jni/jni_common.h>
#include <jni/jni_error.h>
#include <write_optimization.pb.h>
#include <Poco/Logger.h>
#include <Poco/StringTokenizer.h>
#include <Common/CHUtil.h>
#include <Common/ErrorCodes.h>
#include <Common/ExceptionUtils.h>
#include <Common/JNIUtils.h>
#include <Common/QueryContext.h>
#ifdef __cplusplus
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA;
extern const int UNKNOWN_EXCEPTION;
}
}
static DB::ColumnWithTypeAndName getColumnFromColumnVector(JNIEnv * /*env*/, jobject /*obj*/, jlong block_address, jint column_position)
{
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
return block->getByPosition(column_position);
}
static std::string jstring2string(JNIEnv * env, jstring string)
{
if (string == nullptr)
return std::string();
const char * chars = env->GetStringUTFChars(string, nullptr);
std::string ret(chars);
env->ReleaseStringUTFChars(string, chars);
return ret;
}
extern "C" {
#endif
namespace dbms
{
class LocalExecutor;
}
static jclass block_stripes_class;
static jmethodID block_stripes_constructor;
static jclass split_result_class;
static jmethodID split_result_constructor;
static jclass block_stats_class;
static jmethodID block_stats_constructor;
JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
{
JNIEnv * env;
if (vm->GetEnv(reinterpret_cast<void **>(&env), JNI_VERSION_1_8) != JNI_OK)
return JNI_ERR;
local_engine::JniErrorsGlobalState::instance().initialize(env);
block_stripes_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/spark/sql/execution/datasources/BlockStripes;");
block_stripes_constructor = local_engine::GetMethodID(env, block_stripes_class, "<init>", "(J[J[II)V");
split_result_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/CHSplitResult;");
split_result_constructor = local_engine::GetMethodID(env, split_result_class, "<init>", "(JJJJJJ[J[JJJJJJJ)V");
block_stats_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/BlockStats;");
block_stats_constructor = local_engine::GetMethodID(env, block_stats_class, "<init>", "(JZ)V");
local_engine::ShuffleReader::shuffle_input_stream_class
= local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/ShuffleInputStream;");
local_engine::NativeSplitter::iterator_class
= local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/vectorized/IteratorWrapper;");
local_engine::WriteBufferFromJavaOutputStream::output_stream_class
= local_engine::CreateGlobalClassReference(env, "Ljava/io/OutputStream;");
local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class
= local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/execution/ColumnarNativeIterator;");
local_engine::SourceFromJavaIter::serialized_record_batch_iterator_hasNext
= local_engine::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "hasNext", "()Z");
local_engine::SourceFromJavaIter::serialized_record_batch_iterator_next
= local_engine::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "next", "()[B");
local_engine::ReadBufferFromJavaInputStream::input_stream_class
= local_engine::CreateGlobalClassReference(env, "Ljava/io/InputStream;");
local_engine::ReadBufferFromJavaInputStream::input_stream_read
= local_engine::GetMethodID(env, local_engine::ReadBufferFromJavaInputStream::input_stream_class, "read", "([B)I");
local_engine::ShuffleReader::shuffle_input_stream_read
= local_engine::GetMethodID(env, local_engine::ShuffleReader::shuffle_input_stream_class, "read", "(JJ)J");
local_engine::NativeSplitter::iterator_has_next
= local_engine::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "hasNext", "()Z");
local_engine::NativeSplitter::iterator_next
= local_engine::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "next", "()J");
local_engine::WriteBufferFromJavaOutputStream::output_stream_write
= local_engine::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "write", "([BII)V");
local_engine::WriteBufferFromJavaOutputStream::output_stream_flush
= local_engine::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "flush", "()V");
local_engine::SparkRowToCHColumn::spark_row_interator_class
= local_engine::CreateGlobalClassReference(env, "Lorg/apache/gluten/execution/SparkRowIterator;");
local_engine::SparkRowToCHColumn::spark_row_interator_hasNext
= local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "hasNext", "()Z");
local_engine::SparkRowToCHColumn::spark_row_interator_next
= local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "next", "()[B");
local_engine::SparkRowToCHColumn::spark_row_iterator_nextBatch = local_engine::GetMethodID(
env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "nextBatch", "()Ljava/nio/ByteBuffer;");
local_engine::BroadCastJoinBuilder::init(env);
local_engine::CacheManager::initJNI(env);
local_engine::SparkMergeTreeWriterJNI::init(env);
local_engine::SparkRowInfoJNI::init(env);
local_engine::JNIUtils::vm = vm;
return JNI_VERSION_1_8;
}
JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv * env, jclass, jbyteArray conf_plan)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, conf_plan);
const std::string::size_type plan_buf_size = conf_plan_a.length();
local_engine::SparkConfigs::update(
{reinterpret_cast<const char *>(conf_plan_a.elems()), plan_buf_size},
[&](const local_engine::SparkConfigs::ConfigMap & spark_conf_map)
{ local_engine::BackendInitializerUtil::initBackend(spark_conf_map); },
true);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeFinalizeNative(JNIEnv * env, jclass)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::BackendFinalizerUtil::finalizeSessionally();
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeDestroyNative(JNIEnv * env, jclass)
{
LOG_INFO(&Poco::Logger::get("jni"), "start destroy native");
local_engine::BackendFinalizerUtil::finalizeGlobally();
local_engine::JniErrorsGlobalState::instance().destroy(env);
local_engine::BroadCastJoinBuilder::destroy(env);
local_engine::SparkMergeTreeWriterJNI::destroy(env);
local_engine::SparkRowInfoJNI::destroy(env);
env->DeleteGlobalRef(block_stripes_class);
env->DeleteGlobalRef(split_result_class);
env->DeleteGlobalRef(block_stats_class);
env->DeleteGlobalRef(local_engine::ShuffleReader::shuffle_input_stream_class);
env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class);
env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class);
env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class);
env->DeleteGlobalRef(local_engine::SparkRowToCHColumn::spark_row_interator_class);
}
/// Set settings for the current query. It assumes that all parameters are started with `CH_RUNTIME_SETTINGS_PREFIX` prefix,
/// and the prefix is removed by java before passing to C++.
JNIEXPORT void
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_updateQueryRuntimeSettings(JNIEnv * env, jclass, jbyteArray settings)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto query_context = local_engine::QueryContext::instance().currentQueryContext();
const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, settings);
const std::string::size_type conf_plan_size = conf_plan_a.length();
local_engine::updateSettings(query_context, {reinterpret_cast<const char *>(conf_plan_a.elems()), conf_plan_size});
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWithIterator(
JNIEnv * env,
jclass,
jbyteArray plan,
jobjectArray split_infos,
jobjectArray iter_arr,
jbyteArray conf_plan,
jboolean materialize_input,
jint partition_index)
{
LOCAL_ENGINE_JNI_METHOD_START
auto query_context = local_engine::QueryContext::instance().currentQueryContext();
// by task update new configs ( in case of dynamic config update )
const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, conf_plan);
const std::string::size_type conf_plan_size = conf_plan_a.length();
local_engine::SparkConfigs::updateConfig(query_context, {reinterpret_cast<const char *>(conf_plan_a.elems()), conf_plan_size});
const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
const std::string::size_type plan_size = plan_a.length();
auto plan_pb = local_engine::BinaryToMessage<substrait::Plan>({reinterpret_cast<const char *>(plan_a.elems()), plan_size});
auto parser_context = local_engine::ParserContext::build(query_context, plan_pb, partition_index);
local_engine::SerializedPlanParser parser(parser_context);
jsize iter_num = env->GetArrayLength(iter_arr);
for (jsize i = 0; i < iter_num; i++)
{
jobject iter = env->GetObjectArrayElement(iter_arr, i);
iter = env->NewGlobalRef(iter);
parser.addInputIter(iter, materialize_input);
}
for (jsize i = 0, split_info_arr_size = env->GetArrayLength(split_infos); i < split_info_arr_size; i++)
{
jbyteArray split_info = static_cast<jbyteArray>(env->GetObjectArrayElement(split_infos, i));
const auto split_info_a = local_engine::getByteArrayElementsSafe(env, split_info);
const std::string::size_type split_info_size = split_info_a.length();
parser.addSplitInfo({reinterpret_cast<const char *>(split_info_a.elems()), split_info_size});
}
local_engine::LocalExecutor * executor = parser.createExecutor(plan_pb).release();
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
return reinterpret_cast<jlong>(executor);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
// Columnar Iterator
JNIEXPORT jboolean Java_org_apache_gluten_vectorized_BatchIterator_nativeHasNext(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
return executor->hasNext();
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
DB::Block * column_batch = executor->nextColumnar();
return reinterpret_cast<UInt64>(column_batch);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
executor->cancel();
LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", reinterpret_cast<intptr_t>(executor));
local_engine::LocalExecutor::resetCurrentExecutor();
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jstring Java_org_apache_gluten_vectorized_BatchIterator_nativeFetchMetrics(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
/// Collect metrics only if optimizations are disabled, otherwise coredump would happen.
const local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
const auto metric = executor->getMetric();
const String metrics_json = metric ? local_engine::RelMetricSerializer::serializeRelMetric(metric) : "";
return local_engine::charTojstring(env, metrics_json.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT jboolean
Java_org_apache_gluten_vectorized_CHColumnVector_nativeHasNull(JNIEnv * env, jobject obj, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
if (!col.column->isNullable())
{
return false;
}
else
{
const auto * nullable = checkAndGetColumn<DB::ColumnNullable>(&*col.column);
const auto & null_map_data = nullable->getNullMapData();
return !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size());
}
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jint
Java_org_apache_gluten_vectorized_CHColumnVector_nativeNumNulls(JNIEnv * env, jobject obj, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
if (!col.column->isNullable())
{
return 0;
}
else
{
const auto * nullable = checkAndGetColumn<DB::ColumnNullable>(&*col.column);
return std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0);
}
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jboolean Java_org_apache_gluten_vectorized_CHColumnVector_nativeIsNullAt(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
return col.column->isNullAt(row_id);
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jboolean Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetBoolean(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
return nested_col->getBool(row_id);
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jbyte Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetByte(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
return reinterpret_cast<const jbyte *>(nested_col->getDataAt(row_id).data)[0];
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
JNIEXPORT jshort Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetShort(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
return reinterpret_cast<const jshort *>(nested_col->getDataAt(row_id).data)[0];
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jint Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetInt(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
if (col.type->getTypeId() == DB::TypeIndex::Date)
return nested_col->getUInt(row_id);
else
return nested_col->getInt(row_id);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetLong(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
return nested_col->getInt(row_id);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jfloat Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetFloat(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
return nested_col->getFloat32(row_id);
LOCAL_ENGINE_JNI_METHOD_END(env, 0.0)
}
JNIEXPORT jdouble Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetDouble(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
return nested_col->getFloat64(row_id);
LOCAL_ENGINE_JNI_METHOD_END(env, 0.0)
}
JNIEXPORT jstring Java_org_apache_gluten_vectorized_CHColumnVector_nativeGetString(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
const auto * string_col = checkAndGetColumn<DB::ColumnString>(nested_col.get());
auto result = string_col->getDataAt(row_id);
return local_engine::charTojstring(env, result.toString().c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, local_engine::charTojstring(env, ""))
}
// native block
JNIEXPORT void Java_org_apache_gluten_vectorized_CHNativeBlock_nativeClose(JNIEnv * /*env*/, jobject /*obj*/, jlong /*block_address*/)
{
}
JNIEXPORT jint Java_org_apache_gluten_vectorized_CHNativeBlock_nativeNumRows(JNIEnv * env, jobject /*obj*/, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
return block->rows();
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jint Java_org_apache_gluten_vectorized_CHNativeBlock_nativeNumColumns(JNIEnv * env, jobject /*obj*/, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * block = reinterpret_cast<DB::Block *>(block_address);
return block->columns();
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jbyteArray
Java_org_apache_gluten_vectorized_CHNativeBlock_nativeColumnType(JNIEnv * env, jobject /*obj*/, jlong block_address, jint position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * block = reinterpret_cast<DB::Block *>(block_address);
const auto & col = block->getByPosition(position);
std::string substrait_type;
dbms::SerializedPlanBuilder::buildType(col.type, substrait_type);
return local_engine::stringTojbyteArray(env, substrait_type);
LOCAL_ENGINE_JNI_METHOD_END(env, local_engine::stringTojbyteArray(env, ""))
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHNativeBlock_nativeTotalBytes(JNIEnv * env, jobject /*obj*/, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * block = reinterpret_cast<DB::Block *>(block_address);
return block->bytes();
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jobject
Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockStats(JNIEnv * env, jobject obj, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
if (!col.column->isNullable())
{
jobject block_stats = env->NewObject(block_stats_class, block_stats_constructor, block->rows(), false);
return block_stats;
}
else
{
const auto * nullable = checkAndGetColumn<DB::ColumnNullable>(&*col.column);
const auto & null_map_data = nullable->getNullMapData();
jobject block_stats = env->NewObject(
block_stats_class, block_stats_constructor, block->rows(), !DB::memoryIsZero(null_map_data.data(), 0, null_map_data.size()));
return block_stats;
}
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHNativeBlock_copyBlock(JNIEnv * env, jobject obj, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
auto copied_block = block->cloneWithColumns(block->getColumns());
auto * a = new DB::Block(std::move(copied_block));
return reinterpret_cast<jlong>(a);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHNativeBlock_nativeSlice(JNIEnv * env, jobject /* obj */, jlong block_address, jint offset, jint limit)
{
LOCAL_ENGINE_JNI_METHOD_START
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
DB::Block cut_block = block->cloneWithCutColumns(offset, limit);
return reinterpret_cast<jlong>(new DB::Block(std::move(cut_block)));
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHStreamReader_createNativeShuffleReader(
JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed, jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * input = env->NewGlobalRef(input_stream);
auto read_buffer = std::make_unique<local_engine::ReadBufferFromJavaShuffleInputStream>(input);
auto * shuffle_reader
= new local_engine::ShuffleReader(std::move(read_buffer), compressed, max_shuffle_read_rows, max_shuffle_read_bytes);
return reinterpret_cast<jlong>(shuffle_reader);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHStreamReader_nativeNext(JNIEnv * env, jobject /*obj*/, jlong shuffle_reader)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleReader * reader = reinterpret_cast<local_engine::ShuffleReader *>(shuffle_reader);
DB::Block * block = reader->read();
return reinterpret_cast<jlong>(block);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHStreamReader_directRead(
JNIEnv * env, jclass /*clazz*/, jobject input_stream, jbyteArray buffer, jint buffer_size)
{
LOCAL_ENGINE_JNI_METHOD_START
// auto * input = env->NewGlobalRef(input_stream);
auto rb = std::make_unique<local_engine::ReadBufferFromJavaInputStream>(input_stream, buffer, buffer_size);
auto reader = std::make_unique<local_engine::NativeReader>(*rb);
DB::Block block = reader->read();
DB::Block * res = new DB::Block(block);
return reinterpret_cast<jlong>(res);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_org_apache_gluten_vectorized_CHStreamReader_nativeClose(JNIEnv * env, jobject /*obj*/, jlong shuffle_reader)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleReader * reader = reinterpret_cast<local_engine::ShuffleReader *>(shuffle_reader);
delete reader;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
local_engine::SplitterHolder * buildAndExecuteShuffle(
JNIEnv * env, jobject iter, const String & name, const local_engine::SplitOptions & options, jobject rss_pusher = nullptr)
{
auto current_executor = local_engine::LocalExecutor::getCurrentExecutor();
local_engine::SplitterHolder * splitter = nullptr;
// There are two modes of fallback, one is full fallback but uses columnar shuffle,
// and the other is partial fallback that creates one or more LocalExecutor.
// In full fallback, the current executor does not exist.
if (!current_executor.has_value() || current_executor.value()->fallbackMode())
{
auto first_block = local_engine::SourceFromJavaIter::peekBlock(env, iter);
if (first_block.has_value())
{
/// Try to decide header from the first block read from Java iterator.
auto header = first_block.value().cloneEmpty();
splitter = new local_engine::SplitterHolder{
.exchange_manager = std::make_unique<local_engine::SparkExchangeManager>(header, name, options, rss_pusher)};
splitter->exchange_manager->initSinks(1);
splitter->exchange_manager->pushBlock(first_block.value());
first_block = std::nullopt;
// in fallback mode, spark's whole stage code gen operator uses TaskContext and needs to be executed in the task thread.
while (auto block = local_engine::SourceFromJavaIter::peekBlock(env, iter))
splitter->exchange_manager->pushBlock(block.value());
}
else
// empty iterator
splitter = new local_engine::SplitterHolder{
.exchange_manager = std::make_unique<local_engine::SparkExchangeManager>(DB::Block(), name, options, rss_pusher)};
}
else
{
splitter = new local_engine::SplitterHolder{
.exchange_manager = std::make_unique<local_engine::SparkExchangeManager>(
current_executor.value()->getHeader().cloneEmpty(), name, options, rss_pusher)};
// TODO support multiple sinks
splitter->exchange_manager->initSinks(1);
current_executor.value()->setSinks([&](auto & pipeline_builder)
{ splitter->exchange_manager->setSinksToPipeline(pipeline_builder); });
// execute pipeline
current_executor.value()->execute();
}
return splitter;
}
// Splitter Jni Wrapper
JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_nativeMake(
JNIEnv * env,
jobject,
jobject iter,
jstring short_name,
jint num_partitions,
jbyteArray expr_list,
jbyteArray out_expr_list,
jint shuffle_id,
jlong map_id,
jint split_size,
jstring codec,
jint compress_level,
jstring data_file,
jstring local_dirs,
jint num_sub_dirs,
jlong spill_threshold,
jstring hash_algorithm,
jlong max_sort_buffer_size,
jboolean force_memory_sort)
{
LOCAL_ENGINE_JNI_METHOD_START
std::string hash_exprs;
std::string out_exprs;
if (expr_list != nullptr)
{
const auto expr_list_a = local_engine::getByteArrayElementsSafe(env, expr_list);
const std::string::size_type expr_list_size = expr_list_a.length();
hash_exprs = std::string{reinterpret_cast<const char *>(expr_list_a.elems()), expr_list_size};
}
if (out_expr_list != nullptr)
{
const auto out_expr_list_a = local_engine::getByteArrayElementsSafe(env, out_expr_list);
const std::string::size_type out_expr_list_size = out_expr_list_a.length();
out_exprs = std::string{reinterpret_cast<const char *>(out_expr_list_a.elems()), out_expr_list_size};
}
Poco::StringTokenizer local_dirs_tokenizer(jstring2string(env, local_dirs), ",");
std::vector<std::string> local_dirs_list;
local_dirs_list.insert(local_dirs_list.end(), local_dirs_tokenizer.begin(), local_dirs_tokenizer.end());
local_engine::SplitOptions options{
.split_size = static_cast<size_t>(split_size),
.io_buffer_size = DB::DBMS_DEFAULT_BUFFER_SIZE,
.data_file = jstring2string(env, data_file),
.local_dirs_list = std::move(local_dirs_list),
.num_sub_dirs = num_sub_dirs,
.shuffle_id = shuffle_id,
.map_id = static_cast<int>(map_id),
.partition_num = static_cast<size_t>(num_partitions),
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
.compress_level = compress_level < 0 ? std::nullopt : std::optional<int>(compress_level),
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm),
.max_sort_buffer_size = static_cast<size_t>(max_sort_buffer_size),
.force_memory_sort = static_cast<bool>(force_memory_sort)};
auto name = jstring2string(env, short_name);
return reinterpret_cast<jlong>(buildAndExecuteShuffle(env, iter, name, options));
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_nativeMakeForRSS(
JNIEnv * env,
jobject,
jobject iter,
jstring short_name,
jint num_partitions,
jbyteArray expr_list,
jbyteArray out_expr_list,
jint shuffle_id,
jlong map_id,
jint split_size,
jstring codec,
jint compress_level,
jlong spill_threshold,
jstring hash_algorithm,
jobject pusher,
jboolean force_memory_sort)
{
LOCAL_ENGINE_JNI_METHOD_START
std::string hash_exprs;
std::string out_exprs;
if (expr_list != nullptr)
{
const auto expr_list_a = local_engine::getByteArrayElementsSafe(env, expr_list);
const std::string::size_type expr_list_size = expr_list_a.length();
hash_exprs = std::string{reinterpret_cast<const char *>(expr_list_a.elems()), expr_list_size};
}
if (out_expr_list != nullptr)
{
const auto out_expr_list_a = local_engine::getByteArrayElementsSafe(env, out_expr_list);
const std::string::size_type out_expr_list_size = out_expr_list_a.length();
out_exprs = std::string{reinterpret_cast<const char *>(out_expr_list_a.elems()), out_expr_list_size};
}
local_engine::SplitOptions options{
.split_size = static_cast<size_t>(split_size),
.io_buffer_size = DB::DBMS_DEFAULT_BUFFER_SIZE,
.shuffle_id = shuffle_id,
.map_id = static_cast<int>(map_id),
.partition_num = static_cast<size_t>(num_partitions),
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
.compress_level = compress_level < 0 ? std::nullopt : std::optional<int>(compress_level),
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm),
.force_memory_sort = static_cast<bool>(force_memory_sort)};
auto name = jstring2string(env, short_name);
return reinterpret_cast<jlong>(buildAndExecuteShuffle(env, iter, name, options, pusher));
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_stop(JNIEnv * env, jobject, jlong splitterId)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::SplitterHolder * splitter = reinterpret_cast<local_engine::SplitterHolder *>(splitterId);
splitter->exchange_manager->finish();
auto result = splitter->exchange_manager->getSplitResult();
const auto & partition_lengths = result.partition_lengths;
auto * partition_length_arr = env->NewLongArray(partition_lengths.size());
const auto * src = reinterpret_cast<const jlong *>(partition_lengths.data());
env->SetLongArrayRegion(partition_length_arr, 0, partition_lengths.size(), src);
const auto & raw_partition_lengths = result.raw_partition_lengths;
auto * raw_partition_length_arr = env->NewLongArray(raw_partition_lengths.size());
const auto * raw_src = reinterpret_cast<const jlong *>(raw_partition_lengths.data());
env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src);
// AQE has dependency on total_bytes_written, if the data is wrong, it will generate inappropriate plan
// add a log here for remining this.
if (result.total_rows && !result.total_bytes_written)
LOG_WARNING(getLogger("CHShuffleSplitterJniWrapper"), "total_bytes_written is 0, something may be wrong");
jobject split_result = env->NewObject(
split_result_class,
split_result_constructor,
result.total_compute_pid_time,
result.total_write_time,
result.total_spill_time,
result.total_compress_time,
result.total_bytes_written,
result.total_bytes_spilled,
partition_length_arr,
raw_partition_length_arr,
result.total_split_time,
result.total_io_time,
result.total_serialize_time,
result.total_rows,
result.total_blocks,
result.wall_time);
return split_result;
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT void Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_close(JNIEnv * env, jobject, jlong splitterId)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::SplitterHolder * splitter = reinterpret_cast<local_engine::SplitterHolder *>(splitterId);
delete splitter;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
// CHBlockConverterJniWrapper
JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_convertColumnarToRow(
JNIEnv * env, jclass, jlong block_address, jintArray masks)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::MaskVector mask = nullptr;
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
if (masks != nullptr)
{
auto safeArray = local_engine::getIntArrayElementsSafe(env, masks);
mask = std::make_unique<std::vector<size_t>>();
for (int j = 0; j < safeArray.length(); j++)
mask->push_back(safeArray.elems()[j]);
}
local_engine::CHColumnToSparkRow converter;
std::unique_ptr<local_engine::SparkRowInfo> spark_row_info = converter.convertCHColumnToSparkRow(*block, mask);
return local_engine::SparkRowInfoJNI::create(env, *spark_row_info);
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT void Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_freeMemory(JNIEnv * env, jclass, jlong address, jlong size)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::CHColumnToSparkRow converter;
converter.freeMem(reinterpret_cast<char *>(address), size);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_convertSparkRowsToCHColumn(
JNIEnv * env, jclass, jobject java_iter, jobjectArray names, jobjectArray types)
{
LOCAL_ENGINE_JNI_METHOD_START
using namespace std;
int num_columns = env->GetArrayLength(names);
vector<string> c_names;
vector<string> c_types;
c_names.reserve(num_columns);
for (int i = 0; i < num_columns; i++)
{
auto * name = static_cast<jstring>(env->GetObjectArrayElement(names, i));
c_names.emplace_back(jstring2string(env, name));
auto * type = static_cast<jbyteArray>(env->GetObjectArrayElement(types, i));
auto type_length = env->GetArrayLength(type);
jbyte * type_ptr = env->GetByteArrayElements(type, nullptr);
string str_type(reinterpret_cast<const char *>(type_ptr), type_length);
c_types.emplace_back(std::move(str_type));
env->ReleaseByteArrayElements(type, type_ptr, JNI_ABORT);
env->DeleteLocalRef(name);
env->DeleteLocalRef(type);
}
auto * block = local_engine::SparkRowToCHColumn::convertSparkRowItrToCHColumn(java_iter, c_names, c_types);
return reinterpret_cast<jlong>(block);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_org_apache_gluten_vectorized_CHBlockConverterJniWrapper_freeBlock(JNIEnv * env, jclass, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::SparkRowToCHColumn::freeBlock(reinterpret_cast<DB::Block *>(block_address));
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeCreateInstance(JNIEnv * env, jobject)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = new local_engine::NativeWriterInMemory();
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void
Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance);
auto * block = reinterpret_cast<DB::Block *>(block_address);
writer->write(*block);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jint Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeResultSize(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance);
return static_cast<jint>(writer->collect().size());
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void
Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeCollect(JNIEnv * env, jobject, jlong instance, jbyteArray result)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance);
auto data = writer->collect();
env->SetByteArrayRegion(result, 0, data.size(), reinterpret_cast<const jbyte *>(data.data()));
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeClose(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance);
delete writer;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createFilerWriter(
JNIEnv * env, jobject, jstring file_uri_, jbyteArray writeRel)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto writeRelBytes = local_engine::getByteArrayElementsSafe(env, writeRel);
substrait::WriteRel write_rel = local_engine::BinaryToMessage<substrait::WriteRel>(
{reinterpret_cast<const char *>(writeRelBytes.elems()), static_cast<size_t>(writeRelBytes.length())});
assert(write_rel.has_named_table());
const substrait::NamedObjectWrite & named_table = write_rel.named_table();
local_engine::Write write_opt;
named_table.advanced_extension().optimization().UnpackTo(&write_opt);
DB::Block preferred_schema = local_engine::TypeParser::buildBlockFromNamedStructWithoutDFS(write_rel.table_schema());
const auto file_uri = jstring2string(env, file_uri_);
// for HiveFileFormat, the file url may not end with .parquet, so we pass in the format as a hint
const auto context = local_engine::QueryContext::instance().currentQueryContext();
auto * writer = local_engine::NormalFileWriter::create(context, file_uri, preferred_schema, write_opt.common().format()).release();
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createMergeTreeWriter(
JNIEnv * env, jobject, jbyteArray writeRel, jbyteArray conf_plan)
{
LOCAL_ENGINE_JNI_METHOD_START
auto query_context = local_engine::QueryContext::instance().currentQueryContext();
// by task update new configs (in case of dynamic config update)
const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, conf_plan);
local_engine::SparkConfigs::updateConfig(
query_context, {reinterpret_cast<const char *>(conf_plan_a.elems()), static_cast<size_t>(conf_plan_a.length())});
const auto writeRelBytes = local_engine::getByteArrayElementsSafe(env, writeRel);
substrait::WriteRel write_rel = local_engine::BinaryToMessage<substrait::WriteRel>(
{reinterpret_cast<const char *>(writeRelBytes.elems()), static_cast<size_t>(writeRelBytes.length())});
assert(write_rel.has_named_table());
const substrait::NamedObjectWrite & named_table = write_rel.named_table();
local_engine::Write write;
if (!named_table.advanced_extension().optimization().UnpackTo(&write))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Failed to unpack write optimization with local_engine::Write.");
assert(write.has_common());
assert(write.has_mergetree());
local_engine::MergeTreeTable merge_tree_table(write, write_rel.table_schema());
const std::string & id = write.common().job_task_attempt_id();
return reinterpret_cast<jlong>(local_engine::SparkMergeTreeWriter::create(merge_tree_table, query_context, id).release());
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_filterRangesOnDriver(
JNIEnv * env, jclass, jbyteArray plan_, jbyteArray read_)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan_);
auto plan_pb = local_engine::BinaryToMessage<substrait::Plan>(
{reinterpret_cast<const char *>(plan_a.elems()), static_cast<size_t>(plan_a.length())});
auto parser_context = local_engine::ParserContext::build(local_engine::QueryContext::globalContext(), plan_pb);
local_engine::SerializedPlanParser parser(parser_context);
const auto read_a = local_engine::getByteArrayElementsSafe(env, read_);
auto read_pb = local_engine::BinaryToMessage<substrait::Rel>(
{reinterpret_cast<const char *>(read_a.elems()), static_cast<size_t>(read_a.length())});
local_engine::MergeTreeRelParser mergeTreeParser(parser_context, local_engine::QueryContext::globalContext());
auto res = mergeTreeParser.filterRangesOnDriver(read_pb.read());
return local_engine::charTojstring(env, res.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT void
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_write(JNIEnv * env, jobject, jlong instanceId, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::NativeOutputWriter *>(instanceId);
auto * block = reinterpret_cast<DB::Block *>(block_address);
writer->write(*block);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_close(JNIEnv * env, jobject, jlong instanceId)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::NativeOutputWriter *>(instanceId);
SCOPE_EXIT({ delete writer; });
writer->close();
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeMergeMTParts(
JNIEnv * env, jclass, jbyteArray split_info_, jstring partition_dir_, jstring bucket_dir_)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto uuid_str = toString(DB::UUIDHelpers::generateV4());
const auto partition_dir = jstring2string(env, partition_dir_);
const auto bucket_dir = jstring2string(env, bucket_dir_);
const auto split_info_a = local_engine::getByteArrayElementsSafe(env, split_info_);
auto extension_table = local_engine::BinaryToMessage<substrait::ReadRel::ExtensionTable>(
{reinterpret_cast<const char *>(split_info_a.elems()), static_cast<size_t>(split_info_a.length())});
local_engine::MergeTreeTableInstance merge_tree_table(extension_table);
auto context = local_engine::QueryContext::instance().currentQueryContext();
// each task, using its own CustomStorageMergeTree, doesn't reuse
auto temp_storage = merge_tree_table.copyToVirtualStorage(context);
// prefetch all needed parts metadata before merge
local_engine::restoreMetaData(temp_storage, merge_tree_table, *context);
// to release temp CustomStorageMergeTree with RAII
DB::StorageID storage_id = temp_storage->getStorageID();
SCOPE_EXIT({ local_engine::StorageMergeTreeFactory::freeStorage(storage_id); });
std::vector<DB::DataPartPtr> selected_parts
= local_engine::StorageMergeTreeFactory::getDataPartsByNames(temp_storage->getStorageID(), "", merge_tree_table.getPartNames());
DB::MergeTreeDataPartPtr loaded = local_engine::mergeParts(selected_parts, uuid_str, *temp_storage, partition_dir, bucket_dir);
std::vector<local_engine::PartInfo> res;
saveFileStatus(*temp_storage, context, loaded->name, const_cast<DB::IDataPartStorage &>(loaded->getDataPartStorage()));
res.emplace_back(local_engine::PartInfo{
loaded->name, loaded->getMarksCount(), loaded->getBytesOnDisk(), loaded->rows_count, partition_dir, bucket_dir});
auto json_info = local_engine::PartInfo::toJson(res);
return local_engine::charTojstring(env, json_info.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT jobject Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_splitBlockByPartitionAndBucket(
JNIEnv * env, jclass, jlong blockAddress, jintArray partitionColIndice, jboolean hasBucket)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * block = reinterpret_cast<DB::Block *>(blockAddress);
auto safeArray = local_engine::getIntArrayElementsSafe(env, partitionColIndice);
std::vector<size_t> partition_col_indice_vec;
for (int i = 0; i < safeArray.length(); ++i)
partition_col_indice_vec.push_back(safeArray.elems()[i]);
auto query_context = local_engine::QueryContext::instance().currentQueryContext();
const DB::Settings & settings = query_context->getSettingsRef();
bool reserve_ = local_engine::settingsEqual(settings, "gluten.write.reserve_partition_columns", "true");
local_engine::BlockStripes bs = local_engine::BlockStripeSplitter::split(*block, partition_col_indice_vec, hasBucket, reserve_);
auto * addresses = env->NewLongArray(bs.block_addresses.size());
env->SetLongArrayRegion(addresses, 0, bs.block_addresses.size(), reinterpret_cast<const jlong *>(bs.block_addresses.data()));
auto * indices = env->NewIntArray(bs.heading_row_indice.size());
env->SetIntArrayRegion(indices, 0, bs.heading_row_indice.size(), bs.heading_row_indice.data());
jobject block_stripes = env->NewObject(
block_stripes_class, block_stripes_constructor, bs.origin_block_address, addresses, indices, bs.origin_block_num_columns);
return block_stripes;
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeBuild(
JNIEnv * env,
jclass,
jstring key,
jbyteArray in,
jlong row_count_,
jstring join_key_,
jint join_type_,
jboolean has_mixed_join_condition,
jboolean is_existence_join,
jbyteArray named_struct,
jboolean is_null_aware_anti_join,
jboolean has_null_key_values)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto hash_table_id = jstring2string(env, key);
const auto join_key = jstring2string(env, join_key_);
const auto named_struct_a = local_engine::getByteArrayElementsSafe(env, named_struct);
const std::string::size_type struct_size = named_struct_a.length();
std::string struct_string{reinterpret_cast<const char *>(named_struct_a.elems()), struct_size};
const jsize length = env->GetArrayLength(in);
local_engine::ReadBufferFromByteArray read_buffer_from_java_array(in, length);
DB::CompressedReadBuffer input(read_buffer_from_java_array);
local_engine::configureCompressedReadBuffer(input);
const auto * obj = make_wrapper(local_engine::BroadCastJoinBuilder::buildJoin(
hash_table_id,
input,
row_count_,
join_key,
join_type_,
has_mixed_join_condition,
is_existence_join,
struct_string,
is_null_aware_anti_join,
has_null_key_values));
return obj->instance();
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeCloneBuildHashTable(JNIEnv * env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * cloned
= local_engine::make_wrapper(local_engine::SharedPointerWrapper<local_engine::StorageJoinFromReadBuffer>::sharedPtr(instance));
return cloned->instance();
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
JNIEXPORT void
Java_org_apache_gluten_vectorized_StorageJoinBuilder_nativeCleanBuildHashTable(JNIEnv * env, jclass, jstring hash_table_id_, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
auto hash_table_id = jstring2string(env, hash_table_id_);
local_engine::BroadCastJoinBuilder::cleanBuildHashTable(hash_table_id, instance);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
// BlockSplitIterator
JNIEXPORT jlong Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeCreate(
JNIEnv * env,
jobject,
jobject in,
jstring name,
jstring expr,
jstring schema,
jint partition_num,
jint buffer_size,
jstring hash_algorithm)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Options options;
options.partition_num = partition_num;
options.buffer_size = buffer_size;
auto hash_algorithm_str = jstring2string(env, hash_algorithm);
options.hash_algorithm.swap(hash_algorithm_str);
auto expr_str = jstring2string(env, expr);
std::string schema_str;
if (schema)
schema_str = jstring2string(env, schema);
options.exprs_buffer.swap(expr_str);
options.schema_buffer.swap(schema_str);
local_engine::NativeSplitter::Holder * splitter = new local_engine::NativeSplitter::Holder{
.splitter = local_engine::NativeSplitter::create(jstring2string(env, name), options, in)};
return reinterpret_cast<jlong>(splitter);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeClose(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance);
delete splitter;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jboolean Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeHasNext(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance);
return splitter->splitter->hasNext();
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeNext(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance);
return reinterpret_cast<jlong>(splitter->splitter->next());
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jint Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeNextPartitionId(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance);
return reinterpret_cast<jint>(splitter->splitter->nextPartitionId());
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_BlockOutputStream_nativeCreate(
JNIEnv * env,
jobject,
jobject output_stream,
jbyteArray buffer,
jstring codec,
jint level,
jboolean compressed,
jint customize_buffer_size)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleWriter * writer
= new local_engine::ShuffleWriter(output_stream, buffer, jstring2string(env, codec), level, compressed, customize_buffer_size);
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT long Java_org_apache_gluten_vectorized_BlockOutputStream_directWrite(
JNIEnv * env,
jclass,
jobject output_stream,
jbyteArray buffer,
jint customize_buffer_size,
jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
auto wb = std::make_shared<local_engine::WriteBufferFromJavaOutputStream>(output_stream, buffer, customize_buffer_size);
auto native_writer = std::make_unique<local_engine::NativeWriter>(*wb, block->cloneEmpty());
auto write_size = native_writer->write(*block);
native_writer->flush();
wb->finalize();
return write_size;
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_org_apache_gluten_vectorized_BlockOutputStream_nativeClose(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleWriter * writer = reinterpret_cast<local_engine::ShuffleWriter *>(instance);
writer->flush();
delete writer;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void Java_org_apache_gluten_vectorized_BlockOutputStream_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleWriter * writer = reinterpret_cast<local_engine::ShuffleWriter *>(instance);
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
writer->write(*block);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void Java_org_apache_gluten_vectorized_BlockOutputStream_nativeFlush(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleWriter * writer = reinterpret_cast<local_engine::ShuffleWriter *>(instance);
writer->flush();
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIEnv * env, jclass, jobject input, jbyteArray plan)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
const std::string::size_type plan_size = plan_a.length();
auto plan_pb = local_engine::BinaryToMessage<substrait::Plan>({reinterpret_cast<const char *>(plan_a.elems()), plan_size});
auto parser_context = local_engine::ParserContext::build(local_engine::QueryContext::globalContext(), plan_pb);
local_engine::SerializedPlanParser parser(parser_context);
const jobject iter = env->NewGlobalRef(input);
parser.addInputIter(iter, false);
local_engine::LocalExecutor * executor = parser.createExecutor(plan_pb).release();
return reinterpret_cast<jlong>(executor);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance);
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jboolean Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeHasNext(JNIEnv * env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance);
return executor->hasNext();
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeNext(JNIEnv * env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance);
return reinterpret_cast<jlong>(executor->nextColumnar());
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_org_apache_gluten_memory_CHThreadGroup_createThreadGroup(JNIEnv * env, jclass, jstring task_id_)
{
LOCAL_ENGINE_JNI_METHOD_START
auto task_id = jstring2string(env, task_id_);
return local_engine::QueryContext::instance().initializeQuery(task_id);
LOCAL_ENGINE_JNI_METHOD_END(env, 0l)
}
JNIEXPORT jlong Java_org_apache_gluten_memory_CHThreadGroup_threadGroupPeakMemory(JNIEnv * env, jclass, jlong id)
{
LOCAL_ENGINE_JNI_METHOD_START
return local_engine::QueryContext::instance().currentPeakMemory(id);
LOCAL_ENGINE_JNI_METHOD_END(env, 0l)
}
JNIEXPORT void Java_org_apache_gluten_memory_CHThreadGroup_releaseThreadGroup(JNIEnv * env, jclass, jlong id)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::QueryContext::instance().finalizeQuery(id);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
// only for UT GlutenClickHouseNativeExceptionSuite
JNIEXPORT void Java_org_apache_gluten_utils_TestExceptionUtils_generateNativeException(JNIEnv * env)
{
LOCAL_ENGINE_JNI_METHOD_START
throw DB::Exception(DB::ErrorCodes::UNKNOWN_EXCEPTION, "test native exception");
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jstring Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(
JNIEnv * env, jobject, jstring table_, jstring columns_, jboolean only_meta_cache_)
{
LOCAL_ENGINE_JNI_METHOD_START
auto table_def = jstring2string(env, table_);
auto columns = jstring2string(env, columns_);
Poco::StringTokenizer tokenizer(columns, ",");
std::unordered_set<String> column_set;
for (const auto & col : tokenizer)
column_set.insert(col);
local_engine::MergeTreeTableInstance table(table_def);
auto id = local_engine::CacheManager::instance().cacheParts(table, column_set, only_meta_cache_);
return local_engine::charTojstring(env, id.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
}
JNIEXPORT jobject Java_org_apache_gluten_execution_CHNativeCacheManager_nativeGetCacheStatus(JNIEnv * env, jobject, jstring id)
{
LOCAL_ENGINE_JNI_METHOD_START
return local_engine::CacheManager::instance().getCacheStatus(env, jstring2string(env, id));
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
}
JNIEXPORT jstring Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheFiles(JNIEnv * env, jobject, jbyteArray files)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto files_bytes = local_engine::getByteArrayElementsSafe(env, files);
const std::string::size_type files_bytes_size = files_bytes.length();
std::string_view files_view = {reinterpret_cast<const char *>(files_bytes.elems()), files_bytes_size};
substrait::ReadRel::LocalFiles local_files = local_engine::BinaryToMessage<substrait::ReadRel::LocalFiles>(files_view);
auto jobId = local_engine::CacheManager::instance().cacheFiles(local_files);
return local_engine::charTojstring(env, jobId.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
}
JNIEXPORT void Java_org_apache_gluten_execution_CHNativeCacheManager_removeFiles(JNIEnv * env, jobject, jstring file_, jstring cache_name_)
{
LOCAL_ENGINE_JNI_METHOD_START
auto file = jstring2string(env, file_);
auto cache_name = jstring2string(env, cache_name_);
local_engine::CacheManager::removeFiles(file, cache_name);
LOCAL_ENGINE_JNI_METHOD_END(env, );
}
JNIEXPORT jlong Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_createDeletionVectorWriter(
JNIEnv * env, jclass, jstring table_path_, jint prefix_length_, jlong packingTargetSize_)
{
LOCAL_ENGINE_JNI_METHOD_START
auto table_path = jstring2string(env, table_path_);
const auto query_context = local_engine::QueryContext::instance().currentQueryContext();
auto writer = new local_engine::delta::DeltaWriter(query_context, table_path, prefix_length_, packingTargetSize_);
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, -1);
}
JNIEXPORT void Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_deletionVectorWrite(
JNIEnv * env, jclass, jlong writer_address_, jlong blockAddress)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto * block = reinterpret_cast<DB::Block *>(blockAddress);
auto * writer = reinterpret_cast<local_engine::delta::DeltaWriter *>(writer_address_);
writer->writeDeletionVector(*block);
LOCAL_ENGINE_JNI_METHOD_END(env, );
}
JNIEXPORT jlong
Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_deletionVectorWriteFinalize(JNIEnv * env, jclass, jlong writer_address_)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::delta::DeltaWriter *>(writer_address_);
auto * column_batch = writer->finalize();
delete writer;
return reinterpret_cast<UInt64>(column_batch);
LOCAL_ENGINE_JNI_METHOD_END(env, -1);
}
#ifdef __cplusplus
}
#endif