in cpp/core/jni/JniWrapper.cc [794:906]
JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper_nativeMake( // NOLINT
JNIEnv* env,
jobject wrapper,
jstring partitioningNameJstr,
jint numPartitions,
jint bufferSize,
jint mergeBufferSize,
jdouble mergeThreshold,
jstring codecJstr,
jstring codecBackendJstr,
jint compressionThreshold,
jstring compressionModeJstr,
jstring dataFileJstr,
jint numSubDirs,
jstring localDirsJstr,
jlong memoryManagerHandle,
jdouble reallocThreshold,
jlong firstBatchHandle,
jlong taskAttemptId,
jint startPartitionId,
jint pushBufferMaxSize,
jobject partitionPusher,
jstring partitionWriterTypeJstr) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto memoryManager = jniCastOrThrow<MemoryManager>(memoryManagerHandle);
if (partitioningNameJstr == nullptr) {
throw gluten::GlutenException(std::string("Short partitioning name can't be null"));
}
auto shuffleWriterOptions = ShuffleWriterOptions{
.bufferSize = bufferSize,
.bufferReallocThreshold = reallocThreshold,
.partitioning = gluten::toPartitioning(jStringToCString(env, partitioningNameJstr)),
.taskAttemptId = (int64_t)taskAttemptId,
.startPartitionId = startPartitionId,
};
jclass cls = env->FindClass("java/lang/Thread");
jmethodID mid = env->GetStaticMethodID(cls, "currentThread", "()Ljava/lang/Thread;");
jobject thread = env->CallStaticObjectMethod(cls, mid);
checkException(env);
if (thread == NULL) {
LOG(WARNING) << "Thread.currentThread() return NULL";
} else {
jmethodID midGetid = getMethodIdOrError(env, cls, "getId", "()J");
jlong sid = env->CallLongMethod(thread, midGetid);
checkException(env);
shuffleWriterOptions.threadId = (int64_t)sid;
}
auto partitionWriterOptions = PartitionWriterOptions{
.mergeBufferSize = mergeBufferSize,
.mergeThreshold = mergeThreshold,
.compressionThreshold = compressionThreshold,
.compressionType = getCompressionType(env, codecJstr),
.bufferedWrite = true,
.numSubDirs = numSubDirs,
.pushBufferMaxSize = pushBufferMaxSize > 0 ? pushBufferMaxSize : kDefaultShuffleWriterBufferSize};
if (codecJstr != NULL) {
partitionWriterOptions.codecBackend = getCodecBackend(env, codecBackendJstr);
partitionWriterOptions.compressionMode = getCompressionMode(env, compressionModeJstr);
}
std::unique_ptr<PartitionWriter> partitionWriter;
auto partitionWriterTypeC = env->GetStringUTFChars(partitionWriterTypeJstr, JNI_FALSE);
auto partitionWriterType = std::string(partitionWriterTypeC);
env->ReleaseStringUTFChars(partitionWriterTypeJstr, partitionWriterTypeC);
if (partitionWriterType == "local") {
if (dataFileJstr == NULL) {
throw gluten::GlutenException(std::string("Shuffle DataFile can't be null"));
}
if (localDirsJstr == NULL) {
throw gluten::GlutenException(std::string("Shuffle DataFile can't be null"));
}
auto dataFileC = env->GetStringUTFChars(dataFileJstr, JNI_FALSE);
auto dataFile = std::string(dataFileC);
env->ReleaseStringUTFChars(dataFileJstr, dataFileC);
auto localDirsC = env->GetStringUTFChars(localDirsJstr, JNI_FALSE);
auto configuredDirs = gluten::splitPaths(std::string(localDirsC));
env->ReleaseStringUTFChars(localDirsJstr, localDirsC);
partitionWriter = std::make_unique<LocalPartitionWriter>(
numPartitions,
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
dataFile,
configuredDirs);
} else if (partitionWriterType == "celeborn") {
jclass celebornPartitionPusherClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/spark/shuffle/CelebornPartitionPusher;");
jmethodID celebornPushPartitionDataMethod =
getMethodIdOrError(env, celebornPartitionPusherClass, "pushPartitionData", "(I[BI)I");
JavaVM* vm;
if (env->GetJavaVM(&vm) != JNI_OK) {
throw gluten::GlutenException("Unable to get JavaVM instance");
}
std::shared_ptr<CelebornClient> celebornClient =
std::make_shared<CelebornClient>(vm, partitionPusher, celebornPushPartitionDataMethod);
partitionWriter = std::make_unique<CelebornPartitionWriter>(
numPartitions,
std::move(partitionWriterOptions),
memoryManager->getArrowMemoryPool(),
std::move(celebornClient));
} else {
throw gluten::GlutenException("Unrecognizable partition writer type: " + partitionWriterType);
}
return ctx->objectStore()->save(ctx->createShuffleWriter(
numPartitions, std::move(partitionWriter), std::move(shuffleWriterOptions), memoryManager));
JNI_METHOD_END(kInvalidResourceHandle)
}