cpp/velox/jni/JniFileSystem.cc (384 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "JniFileSystem.h" #include "jni/JniCommon.h" #include "velox/common/io/IoStatistics.h" namespace { constexpr std::string_view kJniFsScheme("jni:"); constexpr std::string_view kJolFsScheme("jol:"); JavaVM* vm; jclass jniFileSystemClass; jclass jniReadFileClass; jclass jniWriteFileClass; jmethodID jniGetFileSystem; jmethodID jniIsCapableForNewFile; jmethodID jniFileSystemOpenFileForRead; jmethodID jniFileSystemOpenFileForWrite; jmethodID jniFileSystemRemove; jmethodID jniFileSystemRename; jmethodID jniFileSystemExists; jmethodID jniFileSystemList; jmethodID jniFileSystemMkdir; jmethodID jniFileSystemRmdir; jmethodID jniReadFilePread; jmethodID jniReadFileShouldCoalesce; jmethodID jniReadFileSize; jmethodID jniReadFileMemoryUsage; jmethodID jniReadFileGetNaturalReadSize; jmethodID jniReadFileClose; jmethodID jniWriteFileAppend; jmethodID jniWriteFileFlush; jmethodID jniWriteFileClose; jmethodID jniWriteFileSize; jstring createJString(JNIEnv* env, const std::string_view& path) { return env->NewStringUTF(std::string(path).c_str()); } std::string_view removePathSchema(std::string_view path) { unsigned long pos = path.find(':'); if (pos == std::string::npos) { return path; } return path.substr(pos + 1); } class JniReadFile : public facebook::velox::ReadFile { public: explicit JniReadFile(jobject obj) { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); obj_ = env->NewGlobalRef(obj); checkException(env); } ~JniReadFile() override { try { closeInternal(); JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); env->DeleteGlobalRef(obj_); checkException(env); } catch (const std::exception& e) { LOG(WARNING) << "Error closing jni read file " << e.what(); } } std::string_view pread( uint64_t offset, uint64_t length, void* buf, facebook::velox::filesystems::File::IoStats* stats = nullptr) const override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); env->CallVoidMethod( obj_, jniReadFilePread, static_cast<jlong>(offset), static_cast<jlong>(length), reinterpret_cast<jlong>(buf)); checkException(env); return std::string_view(reinterpret_cast<const char*>(buf)); } bool shouldCoalesce() const override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); jboolean out = env->CallBooleanMethod(obj_, jniReadFileShouldCoalesce); checkException(env); return out; } uint64_t size() const override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); jlong out = env->CallLongMethod(obj_, jniReadFileSize); checkException(env); return static_cast<uint64_t>(out); } uint64_t memoryUsage() const override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); jlong out = env->CallLongMethod(obj_, jniReadFileMemoryUsage); checkException(env); return static_cast<uint64_t>(out); } std::string getName() const override { return "<JniReadFile>"; } uint64_t getNaturalReadSize() const override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); jlong out = env->CallLongMethod(obj_, jniReadFileGetNaturalReadSize); checkException(env); return static_cast<uint64_t>(out); } private: void closeInternal() { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); env->CallVoidMethod(obj_, jniReadFileClose); checkException(env); } jobject obj_; }; class JniWriteFile : public facebook::velox::WriteFile { public: explicit JniWriteFile(jobject obj) { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); obj_ = env->NewGlobalRef(obj); checkException(env); } ~JniWriteFile() override { try { closeInternal(); JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); env->DeleteGlobalRef(obj_); checkException(env); } catch (const std::exception& e) { LOG(WARNING) << "Error closing jni write file " << e.what(); } } void append(std::string_view data) override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); const void* bytes = data.data(); unsigned long len = data.size(); env->CallVoidMethod(obj_, jniWriteFileAppend, static_cast<jlong>(len), reinterpret_cast<jlong>(bytes)); checkException(env); } void flush() override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); env->CallVoidMethod(obj_, jniWriteFileFlush); checkException(env); } void close() override { closeInternal(); } uint64_t size() const override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); jlong out = env->CallLongMethod(obj_, jniWriteFileSize); checkException(env); return static_cast<uint64_t>(out); } private: void closeInternal() { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); env->CallVoidMethod(obj_, jniWriteFileClose); checkException(env); } jobject obj_; }; // Convert "xxx:/a/b/c" to "/a/b/c". Probably it's Velox's job to remove the protocol when calling the member // functions? class FileSystemWrapper : public facebook::velox::filesystems::FileSystem { public: static std::shared_ptr<facebook::velox::filesystems::FileSystem> wrap( std::shared_ptr<facebook::velox::filesystems::FileSystem> fs) { return std::shared_ptr<facebook::velox::filesystems::FileSystem>(new FileSystemWrapper(fs)); } std::string name() const override { return fs_->name(); } std::unique_ptr<facebook::velox::ReadFile> openFileForRead( std::string_view path, const facebook::velox::filesystems::FileOptions& options) override { return fs_->openFileForRead(rewrite(path), options); } std::unique_ptr<facebook::velox::WriteFile> openFileForWrite( std::string_view path, const facebook::velox::filesystems::FileOptions& options) override { return fs_->openFileForWrite(rewrite(path), options); } void remove(std::string_view path) override { fs_->remove(rewrite(path)); } void rename(std::string_view oldPath, std::string_view newPath, bool overwrite) override { fs_->rename(rewrite(oldPath), rewrite(newPath), overwrite); } bool exists(std::string_view path) override { return fs_->exists(rewrite(path)); } std::vector<std::string> list(std::string_view path) override { return fs_->list(rewrite(path)); } void mkdir(std::string_view path, const facebook::velox::filesystems::DirectoryOptions& options = {}) override { fs_->mkdir(rewrite(path)); } void rmdir(std::string_view path) override { fs_->rmdir(rewrite(path)); } private: FileSystemWrapper(std::shared_ptr<facebook::velox::filesystems::FileSystem> fs) : FileSystem({}), fs_(fs) {} static std::string_view rewrite(std::string_view path) { return removePathSchema(path); } std::shared_ptr<facebook::velox::filesystems::FileSystem> fs_; }; class JniFileSystem : public facebook::velox::filesystems::FileSystem { public: explicit JniFileSystem(jobject obj, std::shared_ptr<const facebook::velox::config::ConfigBase> config) : FileSystem(config) { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); obj_ = env->NewGlobalRef(obj); checkException(env); } ~JniFileSystem() override { try { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); env->DeleteGlobalRef(obj_); checkException(env); } catch (const std::exception& e) { LOG(WARNING) << "Error releasing jni file system " << e.what(); } } std::string name() const override { return "JNI FS"; } std::unique_ptr<facebook::velox::ReadFile> openFileForRead( std::string_view path, const facebook::velox::filesystems::FileOptions& options) override { GLUTEN_CHECK( options.values.empty(), "JniFileSystem::openFileForRead: file options is not empty, this is not currently supported"); JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); jobject obj = env->CallObjectMethod(obj_, jniFileSystemOpenFileForRead, createJString(env, path)); checkException(env); auto out = std::make_unique<JniReadFile>(obj); return out; } std::unique_ptr<facebook::velox::WriteFile> openFileForWrite( std::string_view path, const facebook::velox::filesystems::FileOptions& options) override { GLUTEN_CHECK( options.values.empty(), "JniFileSystem::openFileForWrite: file options is not empty, this is not currently supported"); JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); jobject obj = env->CallObjectMethod(obj_, jniFileSystemOpenFileForWrite, createJString(env, path)); checkException(env); auto out = std::make_unique<JniWriteFile>(obj); return out; } void remove(std::string_view path) override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); env->CallVoidMethod(obj_, jniFileSystemRemove, createJString(env, path)); checkException(env); } void rename(std::string_view oldPath, std::string_view newPath, bool overwrite) override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); env->CallVoidMethod(obj_, jniFileSystemRename, createJString(env, oldPath), createJString(env, newPath), overwrite); checkException(env); } bool exists(std::string_view path) override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); bool out = env->CallBooleanMethod(obj_, jniFileSystemExists, createJString(env, path)); checkException(env); return out; } std::vector<std::string> list(std::string_view path) override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); std::vector<std::string> out; jobjectArray jarray = static_cast<jobjectArray>(env->CallObjectMethod(obj_, jniFileSystemList, createJString(env, path))); checkException(env); jsize length = env->GetArrayLength(jarray); for (jsize i = 0; i < length; ++i) { jstring element = static_cast<jstring>(env->GetObjectArrayElement(jarray, i)); std::string cElement = jStringToCString(env, element); out.push_back(cElement); } return out; } void mkdir(std::string_view path, const facebook::velox::filesystems::DirectoryOptions& options = {}) override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); env->CallVoidMethod(obj_, jniFileSystemMkdir, createJString(env, path)); checkException(env); } void rmdir(std::string_view path) override { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); env->CallVoidMethod(obj_, jniFileSystemRmdir, createJString(env, path)); checkException(env); } static bool isCapableForNewFile(uint64_t size) { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); bool out = env->CallStaticBooleanMethod(jniFileSystemClass, jniIsCapableForNewFile, static_cast<jlong>(size)); checkException(env); return out; } static std::function<bool(std::string_view)> schemeMatcher() { return [](std::string_view filePath) { return filePath.find(kJniFsScheme) == 0; }; } static std::function< std::shared_ptr<FileSystem>(std::shared_ptr<const facebook::velox::config::ConfigBase>, std::string_view)> fileSystemGenerator() { return [](std::shared_ptr<const facebook::velox::config::ConfigBase> properties, std::string_view filePath) { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm, &env); jobject obj = env->CallStaticObjectMethod(jniFileSystemClass, jniGetFileSystem); checkException(env); // remove "jni:" or "jol:" prefix. std::shared_ptr<FileSystem> lfs = FileSystemWrapper::wrap(std::make_shared<JniFileSystem>(obj, properties)); return lfs; }; } private: jobject obj_; }; } // namespace void gluten::initVeloxJniFileSystem(JNIEnv* env) { // vm if (env->GetJavaVM(&vm) != JNI_OK) { throw gluten::GlutenException("Unable to get JavaVM instance"); } // classes jniFileSystemClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/fs/JniFilesystem;"); jniReadFileClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/fs/JniFilesystem$ReadFile;"); jniWriteFileClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/fs/JniFilesystem$WriteFile;"); // methods in JniFilesystem jniGetFileSystem = getStaticMethodIdOrError(env, jniFileSystemClass, "getFileSystem", "()Lorg/apache/gluten/fs/JniFilesystem;"); jniIsCapableForNewFile = getStaticMethodIdOrError(env, jniFileSystemClass, "isCapableForNewFile", "(J)Z"); jniFileSystemOpenFileForRead = getMethodIdOrError( env, jniFileSystemClass, "openFileForRead", "(Ljava/lang/String;)Lorg/apache/gluten/fs/JniFilesystem$ReadFile;"); jniFileSystemOpenFileForWrite = getMethodIdOrError( env, jniFileSystemClass, "openFileForWrite", "(Ljava/lang/String;)Lorg/apache/gluten/fs/JniFilesystem$WriteFile;"); jniFileSystemRemove = getMethodIdOrError(env, jniFileSystemClass, "remove", "(Ljava/lang/String;)V"); jniFileSystemRename = getMethodIdOrError(env, jniFileSystemClass, "rename", "(Ljava/lang/String;Ljava/lang/String;Z)V"); jniFileSystemExists = getMethodIdOrError(env, jniFileSystemClass, "exists", "(Ljava/lang/String;)Z"); jniFileSystemList = getMethodIdOrError(env, jniFileSystemClass, "list", "(Ljava/lang/String;)[Ljava/lang/String;"); jniFileSystemMkdir = getMethodIdOrError(env, jniFileSystemClass, "mkdir", "(Ljava/lang/String;)V"); jniFileSystemRmdir = getMethodIdOrError(env, jniFileSystemClass, "rmdir", "(Ljava/lang/String;)V"); // methods in JniFilesystem$ReadFile jniReadFilePread = getMethodIdOrError(env, jniReadFileClass, "pread", "(JJJ)V"); jniReadFileShouldCoalesce = getMethodIdOrError(env, jniReadFileClass, "shouldCoalesce", "()Z"); jniReadFileSize = getMethodIdOrError(env, jniReadFileClass, "size", "()J"); jniReadFileMemoryUsage = getMethodIdOrError(env, jniReadFileClass, "memoryUsage", "()J"); jniReadFileGetNaturalReadSize = getMethodIdOrError(env, jniReadFileClass, "getNaturalReadSize", "()J"); jniReadFileClose = getMethodIdOrError(env, jniReadFileClass, "close", "()V"); // methods in JniFilesystem$WriteFile jniWriteFileAppend = getMethodIdOrError(env, jniWriteFileClass, "append", "(JJ)V"); jniWriteFileFlush = getMethodIdOrError(env, jniWriteFileClass, "flush", "()V"); jniWriteFileClose = getMethodIdOrError(env, jniWriteFileClass, "close", "()V"); jniWriteFileSize = getMethodIdOrError(env, jniWriteFileClass, "size", "()J"); } void gluten::finalizeVeloxJniFileSystem(JNIEnv* env) { env->DeleteGlobalRef(jniWriteFileClass); env->DeleteGlobalRef(jniReadFileClass); env->DeleteGlobalRef(jniFileSystemClass); vm = nullptr; } // "jol" stands for letting Gluten choose between jni fs and local fs. // This doesn't implement facebook::velox::filesystems::FileSystem since it just // act as a entry-side router to create JniFilesystem and LocalFilesystem void gluten::registerJolFileSystem(uint64_t maxFileSize) { GLUTEN_CHECK(maxFileSize > 0, "Unexpected max file size for jol fs: " + std::to_string(maxFileSize)); auto JolSchemeMatcher = [](std::string_view filePath) { return filePath.find(kJolFsScheme) == 0; }; auto fileSystemGenerator = [maxFileSize]( std::shared_ptr<const facebook::velox::config::ConfigBase> properties, std::string_view filePath) -> std::shared_ptr<facebook::velox::filesystems::FileSystem> { // select JNI file if there is enough space if (JniFileSystem::isCapableForNewFile(maxFileSize)) { return JniFileSystem::fileSystemGenerator()(properties, filePath); } // otherwise select local file // remove "jol:" to make Velox choose local fs. auto localFilePath = removePathSchema(filePath); auto fs = FileSystemWrapper::wrap(facebook::velox::filesystems::getFileSystem(localFilePath, properties)); return fs; }; facebook::velox::filesystems::registerFileSystem(JolSchemeMatcher, fileSystemGenerator); }