cpp-ch/local-engine/local_engine_jni.cpp (1,135 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <numeric>
#include <string>
#include <jni.h>
#include <Builder/SerializedPlanBuilder.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataTypes/DataTypeNullable.h>
#include <Join/BroadCastJoinBuilder.h>
#include <Operator/BlockCoalesceOperator.h>
#include <Parser/CHColumnToSparkRow.h>
#include <Parser/MergeTreeRelParser.h>
#include <Parser/RelParser.h>
#include <Parser/SerializedPlanParser.h>
#include <Parser/SparkRowToCHColumn.h>
#include <Shuffle/CachedShuffleWriter.h>
#include <Shuffle/NativeSplitter.h>
#include <Shuffle/NativeWriterInMemory.h>
#include <Shuffle/PartitionWriter.h>
#include <Shuffle/ShuffleReader.h>
#include <Shuffle/ShuffleSplitter.h>
#include <Shuffle/ShuffleWriter.h>
#include <Shuffle/ShuffleWriterBase.h>
#include <Shuffle/WriteBufferFromJavaOutputStream.h>
#include <Storages/Mergetree/SparkMergeTreeWriter.h>
#include <Storages/Output/BlockStripeSplitter.h>
#include <Storages/Output/FileWriterWrappers.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <jni/ReservationListenerWrapper.h>
#include <jni/SharedPointerWrapper.h>
#include <jni/jni_common.h>
#include <jni/jni_error.h>
#include <Poco/Logger.h>
#include <Poco/StringTokenizer.h>
#include <Common/CHUtil.h>
#include <Common/CurrentThread.h>
#include <Common/ExceptionUtils.h>
#include <Common/JNIUtils.h>
#include <Common/QueryContext.h>
#ifdef __cplusplus
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA;
}
}
static DB::ColumnWithTypeAndName getColumnFromColumnVector(JNIEnv * /*env*/, jobject /*obj*/, jlong block_address, jint column_position)
{
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
return block->getByPosition(column_position);
}
static std::string jstring2string(JNIEnv * env, jstring jStr)
{
if (!jStr)
return "";
jclass string_class = env->GetObjectClass(jStr);
jmethodID get_bytes = env->GetMethodID(string_class, "getBytes", "(Ljava/lang/String;)[B");
jbyteArray string_jbytes
= static_cast<jbyteArray>(local_engine::safeCallObjectMethod(env, jStr, get_bytes, env->NewStringUTF("UTF-8")));
size_t length = static_cast<size_t>(env->GetArrayLength(string_jbytes));
jbyte * p_bytes = env->GetByteArrayElements(string_jbytes, nullptr);
std::string ret = std::string(reinterpret_cast<char *>(p_bytes), length);
env->ReleaseByteArrayElements(string_jbytes, p_bytes, JNI_ABORT);
env->DeleteLocalRef(string_jbytes);
env->DeleteLocalRef(string_class);
return ret;
}
static jstring stringTojstring(JNIEnv * env, const char * pat)
{
jclass strClass = (env)->FindClass("java/lang/String");
jmethodID ctorID = (env)->GetMethodID(strClass, "<init>", "([BLjava/lang/String;)V");
jbyteArray bytes = (env)->NewByteArray(strlen(pat));
(env)->SetByteArrayRegion(bytes, 0, strlen(pat), reinterpret_cast<const jbyte *>(pat));
jstring encoding = (env)->NewStringUTF("UTF-8");
return static_cast<jstring>((env)->NewObject(strClass, ctorID, bytes, encoding));
}
extern "C" {
#endif
namespace dbms
{
class LocalExecutor;
}
static jclass spark_row_info_class;
static jmethodID spark_row_info_constructor;
static jclass block_stripes_class;
static jmethodID block_stripes_constructor;
static jclass split_result_class;
static jmethodID split_result_constructor;
static jclass native_metrics_class;
static jmethodID native_metrics_constructor;
JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
{
JNIEnv * env;
if (vm->GetEnv(reinterpret_cast<void **>(&env), JNI_VERSION_1_8) != JNI_OK)
return JNI_ERR;
local_engine::JniErrorsGlobalState::instance().initialize(env);
spark_row_info_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/row/SparkRowInfo;");
spark_row_info_constructor = local_engine::GetMethodID(env, spark_row_info_class, "<init>", "([J[JJJJ)V");
block_stripes_class = local_engine::CreateGlobalClassReference(env, "Lorg/apache/spark/sql/execution/datasources/BlockStripes;");
block_stripes_constructor = local_engine::GetMethodID(env, block_stripes_class, "<init>", "(J[J[II)V");
split_result_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/CHSplitResult;");
split_result_constructor = local_engine::GetMethodID(env, split_result_class, "<init>", "(JJJJJJ[J[JJJJ)V");
local_engine::ShuffleReader::input_stream_class
= local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/ShuffleInputStream;");
local_engine::NativeSplitter::iterator_class
= local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/vectorized/IteratorWrapper;");
local_engine::WriteBufferFromJavaOutputStream::output_stream_class
= local_engine::CreateGlobalClassReference(env, "Ljava/io/OutputStream;");
local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class
= local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/execution/ColumnarNativeIterator;");
local_engine::SourceFromJavaIter::serialized_record_batch_iterator_hasNext
= local_engine::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "hasNext", "()Z");
local_engine::SourceFromJavaIter::serialized_record_batch_iterator_next
= local_engine::GetMethodID(env, local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class, "next", "()[B");
local_engine::ShuffleReader::input_stream_read
= local_engine::GetMethodID(env, local_engine::ShuffleReader::input_stream_class, "read", "(JJ)J");
local_engine::NativeSplitter::iterator_has_next
= local_engine::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "hasNext", "()Z");
local_engine::NativeSplitter::iterator_next
= local_engine::GetMethodID(env, local_engine::NativeSplitter::iterator_class, "next", "()J");
local_engine::WriteBufferFromJavaOutputStream::output_stream_write
= local_engine::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "write", "([BII)V");
local_engine::WriteBufferFromJavaOutputStream::output_stream_flush
= local_engine::GetMethodID(env, local_engine::WriteBufferFromJavaOutputStream::output_stream_class, "flush", "()V");
local_engine::SparkRowToCHColumn::spark_row_interator_class
= local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/execution/SparkRowIterator;");
local_engine::SparkRowToCHColumn::spark_row_interator_hasNext
= local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "hasNext", "()Z");
local_engine::SparkRowToCHColumn::spark_row_interator_next
= local_engine::GetMethodID(env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "next", "()[B");
local_engine::SparkRowToCHColumn::spark_row_iterator_nextBatch = local_engine::GetMethodID(
env, local_engine::SparkRowToCHColumn::spark_row_interator_class, "nextBatch", "()Ljava/nio/ByteBuffer;");
local_engine::ReservationListenerWrapper::reservation_listener_class
= local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/memory/alloc/CHReservationListener;");
local_engine::ReservationListenerWrapper::reservation_listener_reserve
= local_engine::GetMethodID(env, local_engine::ReservationListenerWrapper::reservation_listener_class, "reserve", "(J)J");
local_engine::ReservationListenerWrapper::reservation_listener_reserve_or_throw
= local_engine::GetMethodID(env, local_engine::ReservationListenerWrapper::reservation_listener_class, "reserveOrThrow", "(J)V");
local_engine::ReservationListenerWrapper::reservation_listener_unreserve
= local_engine::GetMethodID(env, local_engine::ReservationListenerWrapper::reservation_listener_class, "unreserve", "(J)J");
native_metrics_class = local_engine::CreateGlobalClassReference(env, "Lio/glutenproject/metrics/NativeMetrics;");
native_metrics_constructor = local_engine::GetMethodID(env, native_metrics_class, "<init>", "(Ljava/lang/String;)V");
local_engine::BroadCastJoinBuilder::init(env);
local_engine::JNIUtils::vm = vm;
return JNI_VERSION_1_8;
}
JNIEXPORT void JNI_OnUnload(JavaVM * vm, void * /*reserved*/)
{
local_engine::BackendFinalizerUtil::finalizeGlobally();
JNIEnv * env;
vm->GetEnv(reinterpret_cast<void **>(&env), JNI_VERSION_1_8);
local_engine::JniErrorsGlobalState::instance().destroy(env);
local_engine::BroadCastJoinBuilder::destroy(env);
env->DeleteGlobalRef(spark_row_info_class);
env->DeleteGlobalRef(block_stripes_class);
env->DeleteGlobalRef(split_result_class);
env->DeleteGlobalRef(local_engine::ShuffleReader::input_stream_class);
env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class);
env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class);
env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class);
env->DeleteGlobalRef(local_engine::SparkRowToCHColumn::spark_row_interator_class);
env->DeleteGlobalRef(local_engine::ReservationListenerWrapper::reservation_listener_class);
env->DeleteGlobalRef(native_metrics_class);
}
JNIEXPORT void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeInitNative(JNIEnv * env, jobject, jbyteArray conf_plan)
{
LOCAL_ENGINE_JNI_METHOD_START
jsize plan_buf_size = env->GetArrayLength(conf_plan);
jbyte * plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr);
std::string plan_str;
plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr), plan_buf_size);
local_engine::BackendInitializerUtil::init(&plan_str);
env->ReleaseByteArrayElements(conf_plan, plan_buf_addr, JNI_ABORT);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeFinalizeNative(JNIEnv * env)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::BackendFinalizerUtil::finalizeSessionally();
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWithIterator(
JNIEnv * env,
jobject /*obj*/,
jlong allocator_id,
jbyteArray plan,
jobjectArray split_infos,
jobjectArray iter_arr,
jbyteArray conf_plan,
jboolean materialize_input)
{
LOCAL_ENGINE_JNI_METHOD_START
auto query_context = local_engine::getAllocator(allocator_id)->query_context;
// by task update new configs ( in case of dynamic config update )
jsize plan_buf_size = env->GetArrayLength(conf_plan);
jbyte * plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr);
std::string plan_str;
plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr), plan_buf_size);
local_engine::BackendInitializerUtil::updateConfig(query_context, &plan_str);
local_engine::SerializedPlanParser parser(query_context);
jsize iter_num = env->GetArrayLength(iter_arr);
for (jsize i = 0; i < iter_num; i++)
{
jobject iter = env->GetObjectArrayElement(iter_arr, i);
iter = env->NewGlobalRef(iter);
parser.addInputIter(iter, materialize_input);
}
for (jsize i = 0, split_info_arr_size = env->GetArrayLength(split_infos); i < split_info_arr_size; i++) {
jbyteArray split_info = static_cast<jbyteArray>(env->GetObjectArrayElement(split_infos, i));
jsize split_info_size = env->GetArrayLength(split_info);
jbyte * split_info_addr = env->GetByteArrayElements(split_info, nullptr);
std::string split_info_str;
split_info_str.assign(reinterpret_cast<const char *>(split_info_addr), split_info_size);
parser.addSplitInfo(split_info_str);
}
jsize plan_size = env->GetArrayLength(plan);
jbyte * plan_address = env->GetByteArrayElements(plan, nullptr);
std::string plan_string;
plan_string.assign(reinterpret_cast<const char *>(plan_address), plan_size);
auto query_plan = parser.parse(plan_string);
local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context, query_context);
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
executor->execute(std::move(query_plan));
env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT);
env->ReleaseByteArrayElements(conf_plan, plan_buf_addr, JNI_ABORT);
return reinterpret_cast<jlong>(executor);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jboolean Java_io_glutenproject_row_RowIterator_nativeHasNext(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
return executor->hasNext();
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jobject Java_io_glutenproject_row_RowIterator_nativeNext(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
local_engine::SparkRowInfoPtr spark_row_info = executor->next();
auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows());
const auto * offsets_src = reinterpret_cast<const jlong *>(spark_row_info->getOffsets().data());
env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(), offsets_src);
auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows());
const auto * lengths_src = reinterpret_cast<const jlong *>(spark_row_info->getLengths().data());
env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(), lengths_src);
int64_t address = reinterpret_cast<int64_t>(spark_row_info->getBufferAddress());
int64_t column_number = reinterpret_cast<int64_t>(spark_row_info->getNumCols());
int64_t total_size = reinterpret_cast<int64_t>(spark_row_info->getTotalBytes());
jobject spark_row_info_object
= env->NewObject(spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size);
return spark_row_info_object;
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT void Java_io_glutenproject_row_RowIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
// Columnar Iterator
JNIEXPORT jboolean Java_io_glutenproject_vectorized_BatchIterator_nativeHasNext(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
return executor->hasNext();
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_BatchIterator_nativeCHNext(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
DB::Block * column_batch = executor->nextColumnar();
// LOG_DEBUG(&Poco::Logger::get("jni"), "row size of the column batch: {}", column_batch->rows());
return reinterpret_cast<Int64>(column_batch);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_io_glutenproject_vectorized_BatchIterator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jobject Java_io_glutenproject_vectorized_BatchIterator_nativeFetchMetrics(JNIEnv * env, jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
/// Collect metrics only if optimizations are disabled, otherwise coredump would happen.
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
auto metric = executor->getMetric();
String metrics_json = metric ? local_engine::RelMetricSerializer::serializeRelMetric(metric) : "";
LOG_DEBUG(&Poco::Logger::get("jni"), "{}", metrics_json);
jobject native_metrics = env->NewObject(native_metrics_class, native_metrics_constructor, stringTojstring(env, metrics_json.c_str()));
return native_metrics;
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT void
Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeSetJavaTmpDir(JNIEnv * /*env*/, jobject /*obj*/, jstring /*dir*/)
{
}
JNIEXPORT void
Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeSetBatchSize(JNIEnv * /*env*/, jobject /*obj*/, jint /*batch_size*/)
{
}
JNIEXPORT void Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeSetMetricsTime(
JNIEnv * /*env*/, jobject /*obj*/, jboolean /*setMetricsTime*/)
{
}
JNIEXPORT jboolean
Java_io_glutenproject_vectorized_CHColumnVector_nativeHasNull(JNIEnv * env, jobject obj, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
if (!col.column->isNullable())
{
return false;
}
else
{
const auto * nullable = checkAndGetColumn<DB::ColumnNullable>(*col.column);
size_t num_nulls = std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0);
return num_nulls < block->rows();
}
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jint
Java_io_glutenproject_vectorized_CHColumnVector_nativeNumNulls(JNIEnv * env, jobject obj, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
if (!col.column->isNullable())
{
return 0;
}
else
{
const auto * nullable = checkAndGetColumn<DB::ColumnNullable>(*col.column);
return std::accumulate(nullable->getNullMapData().begin(), nullable->getNullMapData().end(), 0);
}
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeIsNullAt(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
return col.column->isNullAt(row_id);
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jboolean Java_io_glutenproject_vectorized_CHColumnVector_nativeGetBoolean(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
return nested_col->getBool(row_id);
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jbyte Java_io_glutenproject_vectorized_CHColumnVector_nativeGetByte(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
return reinterpret_cast<const jbyte *>(nested_col->getDataAt(row_id).data)[0];
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
JNIEXPORT jshort Java_io_glutenproject_vectorized_CHColumnVector_nativeGetShort(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
return reinterpret_cast<const jshort *>(nested_col->getDataAt(row_id).data)[0];
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jint Java_io_glutenproject_vectorized_CHColumnVector_nativeGetInt(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
if (col.type->getTypeId() == DB::TypeIndex::Date)
return nested_col->getUInt(row_id);
else
return nested_col->getInt(row_id);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_CHColumnVector_nativeGetLong(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
return nested_col->getInt(row_id);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jfloat Java_io_glutenproject_vectorized_CHColumnVector_nativeGetFloat(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
return nested_col->getFloat32(row_id);
LOCAL_ENGINE_JNI_METHOD_END(env, 0.0)
}
JNIEXPORT jdouble Java_io_glutenproject_vectorized_CHColumnVector_nativeGetDouble(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
return nested_col->getFloat64(row_id);
LOCAL_ENGINE_JNI_METHOD_END(env, 0.0)
}
JNIEXPORT jstring Java_io_glutenproject_vectorized_CHColumnVector_nativeGetString(
JNIEnv * env, jobject obj, jint row_id, jlong block_address, jint column_position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto col = getColumnFromColumnVector(env, obj, block_address, column_position);
DB::ColumnPtr nested_col = col.column;
if (const auto * nullable_col = checkAndGetColumn<DB::ColumnNullable>(nested_col.get()))
nested_col = nullable_col->getNestedColumnPtr();
const auto * string_col = checkAndGetColumn<DB::ColumnString>(nested_col.get());
auto result = string_col->getDataAt(row_id);
return local_engine::charTojstring(env, result.toString().c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, local_engine::charTojstring(env, ""))
}
// native block
JNIEXPORT void Java_io_glutenproject_vectorized_CHNativeBlock_nativeClose(JNIEnv * /*env*/, jobject /*obj*/, jlong /*block_address*/)
{
}
JNIEXPORT jint Java_io_glutenproject_vectorized_CHNativeBlock_nativeNumRows(JNIEnv * env, jobject /*obj*/, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
return block->rows();
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jint Java_io_glutenproject_vectorized_CHNativeBlock_nativeNumColumns(JNIEnv * env, jobject /*obj*/, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * block = reinterpret_cast<DB::Block *>(block_address);
return block->columns();
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jbyteArray
Java_io_glutenproject_vectorized_CHNativeBlock_nativeColumnType(JNIEnv * env, jobject /*obj*/, jlong block_address, jint position)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * block = reinterpret_cast<DB::Block *>(block_address);
const auto & col = block->getByPosition(position);
std::string substrait_type;
dbms::SerializedPlanBuilder::buildType(col.type, substrait_type);
return local_engine::stringTojbyteArray(env, substrait_type);
LOCAL_ENGINE_JNI_METHOD_END(env, local_engine::stringTojbyteArray(env, ""))
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_CHNativeBlock_nativeTotalBytes(JNIEnv * env, jobject /*obj*/, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * block = reinterpret_cast<DB::Block *>(block_address);
return block->bytes();
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_CHStreamReader_createNativeShuffleReader(
JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed, jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * input = env->NewGlobalRef(input_stream);
auto read_buffer = std::make_unique<local_engine::ReadBufferFromJavaInputStream>(input);
auto * shuffle_reader
= new local_engine::ShuffleReader(std::move(read_buffer), compressed, max_shuffle_read_rows, max_shuffle_read_bytes);
return reinterpret_cast<jlong>(shuffle_reader);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_CHStreamReader_nativeNext(JNIEnv * env, jobject /*obj*/, jlong shuffle_reader)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleReader * reader = reinterpret_cast<local_engine::ShuffleReader *>(shuffle_reader);
DB::Block * block = reader->read();
return reinterpret_cast<jlong>(block);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_io_glutenproject_vectorized_CHStreamReader_nativeClose(JNIEnv * env, jobject /*obj*/, jlong shuffle_reader)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleReader * reader = reinterpret_cast<local_engine::ShuffleReader *>(shuffle_reader);
delete reader;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_CHCoalesceOperator_createNativeOperator(JNIEnv * env, jobject /*obj*/, jint buf_size)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::BlockCoalesceOperator * instance = new local_engine::BlockCoalesceOperator(buf_size);
return reinterpret_cast<jlong>(instance);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeMergeBlock(
JNIEnv * env, jobject /*obj*/, jlong instance_address, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::BlockCoalesceOperator * instance = reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address);
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
auto new_block = DB::Block(*block);
instance->mergeBlock(new_block);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jboolean Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeIsFull(JNIEnv * env, jobject /*obj*/, jlong instance_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::BlockCoalesceOperator * instance = reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address);
bool full = instance->isFull();
return full ? JNI_TRUE : JNI_FALSE;
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeRelease(JNIEnv * env, jobject /*obj*/, jlong instance_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::BlockCoalesceOperator * instance = reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address);
auto * block = instance->releaseBlock();
Int64 address = reinterpret_cast<jlong>(block);
return address;
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_io_glutenproject_vectorized_CHCoalesceOperator_nativeClose(JNIEnv * env, jobject /*obj*/, jlong instance_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::BlockCoalesceOperator * instance = reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address);
delete instance;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
// Splitter Jni Wrapper
JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nativeMake(
JNIEnv * env,
jobject,
jstring short_name,
jint num_partitions,
jbyteArray expr_list,
jbyteArray out_expr_list,
jint shuffle_id,
jlong map_id,
jint split_size,
jstring codec,
jstring data_file,
jstring local_dirs,
jint num_sub_dirs,
jboolean prefer_spill,
jlong spill_threshold,
jstring hash_algorithm,
jboolean throw_if_memory_exceed,
jboolean flush_block_buffer_before_evict)
{
LOCAL_ENGINE_JNI_METHOD_START
std::string hash_exprs;
std::string out_exprs;
if (expr_list != nullptr)
{
int len = env->GetArrayLength(expr_list);
auto * str = reinterpret_cast<jbyte *>(new char[len]);
memset(str, 0, len);
env->GetByteArrayRegion(expr_list, 0, len, str);
hash_exprs = std::string(str, str + len);
delete[] str;
}
if (out_expr_list != nullptr)
{
int len = env->GetArrayLength(out_expr_list);
auto * str = reinterpret_cast<jbyte *>(new char[len]);
memset(str, 0, len);
env->GetByteArrayRegion(out_expr_list, 0, len, str);
out_exprs = std::string(str, str + len);
delete[] str;
}
Poco::StringTokenizer local_dirs_tokenizer(jstring2string(env, local_dirs), ",");
std::vector<std::string> local_dirs_list;
local_dirs_list.insert(local_dirs_list.end(), local_dirs_tokenizer.begin(), local_dirs_tokenizer.end());
local_engine::SplitOptions options{
.split_size = static_cast<size_t>(split_size),
.io_buffer_size = DB::DBMS_DEFAULT_BUFFER_SIZE,
.data_file = jstring2string(env, data_file),
.local_dirs_list = std::move(local_dirs_list),
.num_sub_dirs = num_sub_dirs,
.shuffle_id = shuffle_id,
.map_id = static_cast<int>(map_id),
.partition_num = static_cast<size_t>(num_partitions),
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm),
.throw_if_memory_exceed = static_cast<bool>(throw_if_memory_exceed),
.flush_block_buffer_before_evict = static_cast<bool>(flush_block_buffer_before_evict)};
auto name = jstring2string(env, short_name);
local_engine::SplitterHolder * splitter;
if (prefer_spill)
splitter = new local_engine::SplitterHolder{.splitter = local_engine::ShuffleSplitter::create(name, options)};
else
splitter = new local_engine::SplitterHolder{.splitter = std::make_unique<local_engine::CachedShuffleWriter>(name, options)};
return reinterpret_cast<jlong>(splitter);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nativeMakeForRSS(
JNIEnv * env,
jobject,
jstring short_name,
jint num_partitions,
jbyteArray expr_list,
jbyteArray out_expr_list,
jint shuffle_id,
jlong map_id,
jint split_size,
jstring codec,
jlong spill_threshold,
jstring hash_algorithm,
jobject pusher,
jboolean throw_if_memory_exceed,
jboolean flush_block_buffer_before_evict)
{
LOCAL_ENGINE_JNI_METHOD_START
std::string hash_exprs;
std::string out_exprs;
if (expr_list != nullptr)
{
int len = env->GetArrayLength(expr_list);
auto * str = reinterpret_cast<jbyte *>(new char[len]);
env->GetByteArrayRegion(expr_list, 0, len, str);
hash_exprs = std::string(str, str + len);
delete[] str;
}
if (out_expr_list != nullptr)
{
int len = env->GetArrayLength(out_expr_list);
auto * str = reinterpret_cast<jbyte *>(new char[len]);
env->GetByteArrayRegion(out_expr_list, 0, len, str);
out_exprs = std::string(str, str + len);
delete[] str;
}
local_engine::SplitOptions options{
.split_size = static_cast<size_t>(split_size),
.io_buffer_size = DB::DBMS_DEFAULT_BUFFER_SIZE,
.shuffle_id = shuffle_id,
.map_id = static_cast<int>(map_id),
.partition_num = static_cast<size_t>(num_partitions),
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm),
.throw_if_memory_exceed = static_cast<bool>(throw_if_memory_exceed),
.flush_block_buffer_before_evict = static_cast<bool>(flush_block_buffer_before_evict)};
auto name = jstring2string(env, short_name);
local_engine::SplitterHolder * splitter;
splitter = new local_engine::SplitterHolder{.splitter = std::make_unique<local_engine::CachedShuffleWriter>(name, options, pusher)};
return reinterpret_cast<jlong>(splitter);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_split(JNIEnv * env, jobject, jlong splitterId, jlong block)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::SplitterHolder * splitter = reinterpret_cast<local_engine::SplitterHolder *>(splitterId);
DB::Block * data = reinterpret_cast<DB::Block *>(block);
splitter->splitter->split(*data);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_evict(JNIEnv * env, jobject, jlong splitterId)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::SplitterHolder * splitter = reinterpret_cast<local_engine::SplitterHolder *>(splitterId);
auto size = splitter->splitter->evictPartitions();
return size;
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
JNIEXPORT jobject Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_stop(JNIEnv * env, jobject, jlong splitterId)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::SplitterHolder * splitter = reinterpret_cast<local_engine::SplitterHolder *>(splitterId);
auto result = splitter->splitter->stop();
const auto & partition_lengths = result.partition_lengths;
auto * partition_length_arr = env->NewLongArray(partition_lengths.size());
const auto * src = reinterpret_cast<const jlong *>(partition_lengths.data());
env->SetLongArrayRegion(partition_length_arr, 0, partition_lengths.size(), src);
const auto & raw_partition_lengths = result.raw_partition_lengths;
auto * raw_partition_length_arr = env->NewLongArray(raw_partition_lengths.size());
const auto * raw_src = reinterpret_cast<const jlong *>(raw_partition_lengths.data());
env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src);
jobject split_result = env->NewObject(
split_result_class,
split_result_constructor,
result.total_compute_pid_time,
result.total_write_time,
result.total_spill_time,
result.total_compress_time,
result.total_bytes_written,
result.total_bytes_spilled,
partition_length_arr,
raw_partition_length_arr,
result.total_split_time,
result.total_io_time,
result.total_serialize_time);
return split_result;
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT void Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_close(JNIEnv * env, jobject, jlong splitterId)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::SplitterHolder * splitter = reinterpret_cast<local_engine::SplitterHolder *>(splitterId);
delete splitter;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
// CHBlockConverterJniWrapper
JNIEXPORT jobject
Java_io_glutenproject_vectorized_CHBlockConverterJniWrapper_convertColumnarToRow(JNIEnv * env, jclass, jlong block_address, jintArray masks)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::CHColumnToSparkRow converter;
std::unique_ptr<local_engine::SparkRowInfo> spark_row_info = nullptr;
local_engine::MaskVector mask = nullptr;
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
if (masks != nullptr)
{
jint size = env->GetArrayLength(masks);
jboolean is_cp = JNI_FALSE;
jint * values = env->GetIntArrayElements(masks, &is_cp);
mask = std::make_unique<std::vector<size_t>>();
for (int j = 0; j < size; j++)
mask->push_back(values[j]);
env->ReleaseIntArrayElements(masks, values, JNI_ABORT);
}
spark_row_info = converter.convertCHColumnToSparkRow(*block, mask);
auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows());
const auto * offsets_src = reinterpret_cast<const jlong *>(spark_row_info->getOffsets().data());
env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(), offsets_src);
auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows());
const auto * lengths_src = reinterpret_cast<const jlong *>(spark_row_info->getLengths().data());
env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(), lengths_src);
int64_t address = reinterpret_cast<int64_t>(spark_row_info->getBufferAddress());
int64_t column_number = reinterpret_cast<int64_t>(spark_row_info->getNumCols());
int64_t total_size = reinterpret_cast<int64_t>(spark_row_info->getTotalBytes());
jobject spark_row_info_object
= env->NewObject(spark_row_info_class, spark_row_info_constructor, offsets_arr, lengths_arr, address, column_number, total_size);
return spark_row_info_object;
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT void Java_io_glutenproject_vectorized_CHBlockConverterJniWrapper_freeMemory(JNIEnv * env, jclass, jlong address, jlong size)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::CHColumnToSparkRow converter;
converter.freeMem(reinterpret_cast<char *>(address), size);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_CHBlockConverterJniWrapper_convertSparkRowsToCHColumn(
JNIEnv * env, jclass, jobject java_iter, jobjectArray names, jobjectArray types)
{
LOCAL_ENGINE_JNI_METHOD_START
using namespace std;
int num_columns = env->GetArrayLength(names);
vector<string> c_names;
vector<string> c_types;
c_names.reserve(num_columns);
for (int i = 0; i < num_columns; i++)
{
auto * name = static_cast<jstring>(env->GetObjectArrayElement(names, i));
c_names.emplace_back(jstring2string(env, name));
auto * type = static_cast<jbyteArray>(env->GetObjectArrayElement(types, i));
auto type_length = env->GetArrayLength(type);
jbyte * type_ptr = env->GetByteArrayElements(type, nullptr);
string str_type(reinterpret_cast<const char *>(type_ptr), type_length);
c_types.emplace_back(std::move(str_type));
env->ReleaseByteArrayElements(type, type_ptr, JNI_ABORT);
env->DeleteLocalRef(name);
env->DeleteLocalRef(type);
}
local_engine::SparkRowToCHColumn converter;
auto * block = converter.convertSparkRowItrToCHColumn(java_iter, c_names, c_types);
return reinterpret_cast<jlong>(block);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_io_glutenproject_vectorized_CHBlockConverterJniWrapper_freeBlock(JNIEnv * env, jclass, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::SparkRowToCHColumn converter;
converter.freeBlock(reinterpret_cast<DB::Block *>(block_address));
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_CHBlockWriterJniWrapper_nativeCreateInstance(JNIEnv * env, jobject)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = new local_engine::NativeWriterInMemory();
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void
Java_io_glutenproject_vectorized_CHBlockWriterJniWrapper_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance);
auto * block = reinterpret_cast<DB::Block *>(block_address);
writer->write(*block);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jint Java_io_glutenproject_vectorized_CHBlockWriterJniWrapper_nativeResultSize(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance);
return static_cast<jint>(writer->collect().size());
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void
Java_io_glutenproject_vectorized_CHBlockWriterJniWrapper_nativeCollect(JNIEnv * env, jobject, jlong instance, jbyteArray result)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance);
auto data = writer->collect();
env->SetByteArrayRegion(result, 0, data.size(), reinterpret_cast<const jbyte *>(data.data()));
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void Java_io_glutenproject_vectorized_CHBlockWriterJniWrapper_nativeClose(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::NativeWriterInMemory *>(instance);
delete writer;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitFileWriterWrapper(
JNIEnv * env, jobject, jstring file_uri_, jobjectArray names_, jstring format_hint_)
{
LOCAL_ENGINE_JNI_METHOD_START
int num_columns = env->GetArrayLength(names_);
std::vector<std::string> names;
names.reserve(num_columns);
for (int i = 0; i < num_columns; i++)
{
auto * name = static_cast<jstring>(env->GetObjectArrayElement(names_, i));
names.emplace_back(jstring2string(env, name));
env->DeleteLocalRef(name);
}
auto file_uri = jstring2string(env, file_uri_);
auto format_hint = jstring2string(env, format_hint_);
// for HiveFileFormat, the file url may not end with .parquet, so we pass in the format as a hint
auto * writer = local_engine::createFileWriterWrapper(file_uri, names, format_hint);
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitMergeTreeWriterWrapper(
JNIEnv * env, jobject, jbyteArray plan_, jbyteArray split_info_, jstring uuid_, jstring task_id_, jstring partition_dir_, jstring bucket_dir_)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto uuid_str = jstring2string(env, uuid_);
const auto task_id = jstring2string(env, task_id_);
const auto partition_dir = jstring2string(env, partition_dir_);
const auto bucket_dir = jstring2string(env, bucket_dir_);
jsize plan_buf_size = env->GetArrayLength(plan_);
jbyte * plan_buf_addr = env->GetByteArrayElements(plan_, nullptr);
std::string plan_str;
plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr), plan_buf_size);
jsize split_info_size = env->GetArrayLength(split_info_);
jbyte * split_info_addr = env->GetByteArrayElements(split_info_, nullptr);
std::string split_info_str;
split_info_str.assign(reinterpret_cast<const char *>(split_info_addr), split_info_size);
auto plan_ptr = std::make_unique<substrait::Plan>();
/// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
/// Parsing may fail when the number of recursive layers is large.
/// Here, set a limit large enough to avoid this problem.
/// Once this problem occurs, it is difficult to troubleshoot, because the pb of c++ will not provide any valid information
google::protobuf::io::CodedInputStream coded_in(
reinterpret_cast<const uint8_t *>(plan_str.data()), static_cast<int>(plan_str.size()));
coded_in.SetRecursionLimit(100000);
auto ok = plan_ptr->ParseFromCodedStream(&coded_in);
if (!ok)
throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from string failed");
substrait::ReadRel::ExtensionTable extension_table =
local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);
auto storage = local_engine::MergeTreeRelParser::parseStorage(
plan_ptr->relations()[0].root().input(), extension_table, local_engine::SerializedPlanParser::global_context);
auto uuid = uuid_str + "_" + task_id;
auto * writer = new local_engine::SparkMergeTreeWriter(
*storage, storage->getInMemoryMetadataPtr(), local_engine::SerializedPlanParser::global_context, uuid, partition_dir, bucket_dir);
env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT);
env->ReleaseByteArrayElements(split_info_, split_info_addr, JNI_ABORT);
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
JNIEXPORT void
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_write(JNIEnv * env, jobject, jlong instanceId, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::NormalFileWriter *>(instanceId);
auto * block = reinterpret_cast<DB::Block *>(block_address);
writer->consume(*block);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_close(JNIEnv * env, jobject, jlong instanceId)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::NormalFileWriter *>(instanceId);
writer->close();
delete writer;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_writeToMergeTree(JNIEnv * env, jobject, jlong instanceId, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::SparkMergeTreeWriter *>(instanceId);
auto * block = reinterpret_cast<DB::Block *>(block_address);
writer->write(*block);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_closeMergeTreeWriter(JNIEnv * env, jobject, jlong instanceId)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * writer = reinterpret_cast<local_engine::SparkMergeTreeWriter *>(instanceId);
writer->finalize();
auto part_infos = writer->getAllPartInfo();
auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos);
delete writer;
return stringTojstring(env, json_info.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT jobject Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_splitBlockByPartitionAndBucket(
JNIEnv * env, jclass, jlong blockAddress, jintArray partitionColIndice, jboolean hasBucket, jboolean reserve_partition_columns)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * block = reinterpret_cast<DB::Block *>(blockAddress);
int * pIndice = env->GetIntArrayElements(partitionColIndice, nullptr);
int size = env->GetArrayLength(partitionColIndice);
std::vector<size_t> partition_col_indice_vec;
for (int i = 0; i < size; ++i)
partition_col_indice_vec.push_back(pIndice[i]);
env->ReleaseIntArrayElements(partitionColIndice, pIndice, JNI_ABORT);
local_engine::BlockStripes bs = local_engine::BlockStripeSplitter::split(*block, partition_col_indice_vec, hasBucket, reserve_partition_columns);
auto * addresses = env->NewLongArray(bs.block_addresses.size());
env->SetLongArrayRegion(addresses, 0, bs.block_addresses.size(), bs.block_addresses.data());
auto * indices = env->NewIntArray(bs.heading_row_indice.size());
env->SetIntArrayRegion(indices, 0, bs.heading_row_indice.size(), bs.heading_row_indice.data());
jobject block_stripes = env->NewObject(
block_stripes_class, block_stripes_constructor, bs.origin_block_address, addresses, indices, bs.origin_block_num_columns);
return block_stripes;
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeBuild(
JNIEnv * env, jclass, jstring key, jbyteArray in, jlong row_count_, jstring join_key_, jint join_type_, jbyteArray named_struct)
{
LOCAL_ENGINE_JNI_METHOD_START
const auto hash_table_id = jstring2string(env, key);
const auto join_key = jstring2string(env, join_key_);
const jsize struct_size = env->GetArrayLength(named_struct);
jbyte * struct_address = env->GetByteArrayElements(named_struct, nullptr);
std::string struct_string;
struct_string.assign(reinterpret_cast<const char *>(struct_address), struct_size);
const auto join_type = static_cast<substrait::JoinRel_JoinType>(join_type_);
const jsize length = env->GetArrayLength(in);
local_engine::ReadBufferFromByteArray read_buffer_from_java_array(in, length);
DB::CompressedReadBuffer input(read_buffer_from_java_array);
local_engine::configureCompressedReadBuffer(input);
const auto * obj
= make_wrapper(local_engine::BroadCastJoinBuilder::buildJoin(hash_table_id, input, row_count_, join_key, join_type, struct_string));
env->ReleaseByteArrayElements(named_struct, struct_address, JNI_ABORT);
return obj->instance();
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeCloneBuildHashTable(JNIEnv * env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * cloned
= local_engine::make_wrapper(local_engine::SharedPointerWrapper<local_engine::StorageJoinFromReadBuffer>::sharedPtr(instance));
return cloned->instance();
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
JNIEXPORT void
Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeCleanBuildHashTable(JNIEnv * env, jclass, jstring hash_table_id_, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
auto hash_table_id = jstring2string(env, hash_table_id_);
local_engine::BroadCastJoinBuilder::cleanBuildHashTable(hash_table_id, instance);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
// BlockSplitIterator
JNIEXPORT jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate(
JNIEnv * env,
jobject,
jobject in,
jstring name,
jstring expr,
jstring schema,
jint partition_num,
jint buffer_size,
jstring hash_algorithm)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Options options;
options.partition_num = partition_num;
options.buffer_size = buffer_size;
auto hash_algorithm_str = jstring2string(env, hash_algorithm);
options.hash_algorithm.swap(hash_algorithm_str);
auto expr_str = jstring2string(env, expr);
std::string schema_str;
if (schema)
schema_str = jstring2string(env, schema);
options.exprs_buffer.swap(expr_str);
options.schema_buffer.swap(schema_str);
local_engine::NativeSplitter::Holder * splitter = new local_engine::NativeSplitter::Holder{
.splitter = local_engine::NativeSplitter::create(jstring2string(env, name), options, in)};
return reinterpret_cast<jlong>(splitter);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_io_glutenproject_vectorized_BlockSplitIterator_nativeClose(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance);
delete splitter;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jboolean Java_io_glutenproject_vectorized_BlockSplitIterator_nativeHasNext(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance);
return splitter->splitter->hasNext();
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNext(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance);
return reinterpret_cast<jlong>(splitter->splitter->next());
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jint Java_io_glutenproject_vectorized_BlockSplitIterator_nativeNextPartitionId(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::NativeSplitter::Holder * splitter = reinterpret_cast<local_engine::NativeSplitter::Holder *>(instance);
return reinterpret_cast<jint>(splitter->splitter->nextPartitionId());
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_BlockOutputStream_nativeCreate(
JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring codec, jboolean compressed, jint customize_buffer_size)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleWriter * writer
= new local_engine::ShuffleWriter(output_stream, buffer, jstring2string(env, codec), compressed, customize_buffer_size);
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_io_glutenproject_vectorized_BlockOutputStream_nativeClose(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleWriter * writer = reinterpret_cast<local_engine::ShuffleWriter *>(instance);
writer->flush();
delete writer;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void Java_io_glutenproject_vectorized_BlockOutputStream_nativeWrite(JNIEnv * env, jobject, jlong instance, jlong block_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleWriter * writer = reinterpret_cast<local_engine::ShuffleWriter *>(instance);
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
writer->write(*block);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT void Java_io_glutenproject_vectorized_BlockOutputStream_nativeFlush(JNIEnv * env, jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleWriter * writer = reinterpret_cast<local_engine::ShuffleWriter *>(instance);
writer->flush();
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong
Java_io_glutenproject_vectorized_SimpleExpressionEval_createNativeInstance(JNIEnv * env, jclass, jobject input, jbyteArray plan)
{
LOCAL_ENGINE_JNI_METHOD_START
auto context = DB::Context::createCopy(local_engine::SerializedPlanParser::global_context);
local_engine::SerializedPlanParser parser(context);
jobject iter = env->NewGlobalRef(input);
parser.addInputIter(iter, false);
jsize plan_size = env->GetArrayLength(plan);
jbyte * plan_address = env->GetByteArrayElements(plan, nullptr);
std::string plan_string;
plan_string.assign(reinterpret_cast<const char *>(plan_address), plan_size);
auto query_plan = parser.parse(plan_string);
local_engine::LocalExecutor * executor = new local_engine::LocalExecutor(parser.query_context, context);
executor->execute(std::move(query_plan));
env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT);
return reinterpret_cast<jlong>(executor);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeClose(JNIEnv * env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance);
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jboolean Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeHasNext(JNIEnv * env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance);
return executor->hasNext();
LOCAL_ENGINE_JNI_METHOD_END(env, false)
}
JNIEXPORT jlong Java_io_glutenproject_vectorized_SimpleExpressionEval_nativeNext(JNIEnv * env, jclass, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor = reinterpret_cast<local_engine::LocalExecutor *>(instance);
return reinterpret_cast<jlong>(executor->nextColumnar());
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT jlong Java_io_glutenproject_memory_alloc_CHNativeMemoryAllocator_getDefaultAllocator(JNIEnv *, jclass)
{
return -1;
}
JNIEXPORT jlong Java_io_glutenproject_memory_alloc_CHNativeMemoryAllocator_createListenableAllocator(JNIEnv * env, jclass, jobject listener)
{
LOCAL_ENGINE_JNI_METHOD_START
auto listener_wrapper = std::make_shared<local_engine::ReservationListenerWrapper>(env->NewGlobalRef(listener));
return local_engine::initializeQuery(listener_wrapper);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
JNIEXPORT void Java_io_glutenproject_memory_alloc_CHNativeMemoryAllocator_releaseAllocator(JNIEnv * env, jclass, jlong allocator_id)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::releaseAllocator(allocator_id);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
JNIEXPORT jlong Java_io_glutenproject_memory_alloc_CHNativeMemoryAllocator_bytesAllocated(JNIEnv * env, jclass, jlong allocator_id)
{
LOCAL_ENGINE_JNI_METHOD_START
return local_engine::allocatorMemoryUsage(allocator_id);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
#ifdef __cplusplus
}
#endif