/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "JniFileSystem.h"
#include "jni/JniCommon.h"

namespace {
constexpr std::string_view kJniFsScheme("jni:");
constexpr std::string_view kJolFsScheme("jol:");

JavaVM* vm;

jclass jniFileSystemClass;
jclass jniReadFileClass;
jclass jniWriteFileClass;

jmethodID jniGetFileSystem;
jmethodID jniIsCapableForNewFile;
jmethodID jniFileSystemOpenFileForRead;
jmethodID jniFileSystemOpenFileForWrite;
jmethodID jniFileSystemRemove;
jmethodID jniFileSystemRename;
jmethodID jniFileSystemExists;
jmethodID jniFileSystemList;
jmethodID jniFileSystemMkdir;
jmethodID jniFileSystemRmdir;

jmethodID jniReadFilePread;
jmethodID jniReadFileShouldCoalesce;
jmethodID jniReadFileSize;
jmethodID jniReadFileMemoryUsage;
jmethodID jniReadFileGetNaturalReadSize;
jmethodID jniReadFileClose;

jmethodID jniWriteFileAppend;
jmethodID jniWriteFileFlush;
jmethodID jniWriteFileClose;
jmethodID jniWriteFileSize;

jstring createJString(JNIEnv* env, const std::string_view& path) {
  return env->NewStringUTF(std::string(path).c_str());
}

std::string_view removePathSchema(std::string_view path) {
  unsigned long pos = path.find(':');
  if (pos == std::string::npos) {
    return path;
  }
  return path.substr(pos + 1);
}

class JniReadFile : public facebook::velox::ReadFile {
 public:
  explicit JniReadFile(jobject obj) {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    obj_ = env->NewGlobalRef(obj);
    checkException(env);
  }

  ~JniReadFile() override {
    try {
      close0();
      JNIEnv* env;
      attachCurrentThreadAsDaemonOrThrow(vm, &env);
      env->DeleteGlobalRef(obj_);
      checkException(env);
    } catch (const std::exception& e) {
      LOG(WARNING) << "Error closing jni read file " << e.what();
    }
  }

  std::string_view pread(uint64_t offset, uint64_t length, void* buf) const override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    env->CallVoidMethod(
        obj_, jniReadFilePread, static_cast<jlong>(offset), static_cast<jlong>(length), reinterpret_cast<jlong>(buf));
    checkException(env);
    return std::string_view(reinterpret_cast<const char*>(buf));
  }

  bool shouldCoalesce() const override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    jboolean out = env->CallBooleanMethod(obj_, jniReadFileShouldCoalesce);
    checkException(env);
    return out;
  }

  uint64_t size() const override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    jlong out = env->CallLongMethod(obj_, jniReadFileSize);
    checkException(env);
    return static_cast<uint64_t>(out);
  }

  uint64_t memoryUsage() const override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    jlong out = env->CallLongMethod(obj_, jniReadFileMemoryUsage);
    checkException(env);
    return static_cast<uint64_t>(out);
  }

  std::string getName() const override {
    return "<JniReadFile>";
  }

  uint64_t getNaturalReadSize() const override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    jlong out = env->CallLongMethod(obj_, jniReadFileGetNaturalReadSize);
    checkException(env);
    return static_cast<uint64_t>(out);
  }

 private:
  void close0() {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    env->CallVoidMethod(obj_, jniReadFileClose);
    checkException(env);
  }

  jobject obj_;
};

class JniWriteFile : public facebook::velox::WriteFile {
 public:
  explicit JniWriteFile(jobject obj) {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    obj_ = env->NewGlobalRef(obj);
    checkException(env);
  }

  ~JniWriteFile() override {
    try {
      close0();
      JNIEnv* env;
      attachCurrentThreadAsDaemonOrThrow(vm, &env);
      env->DeleteGlobalRef(obj_);
      checkException(env);
    } catch (const std::exception& e) {
      LOG(WARNING) << "Error closing jni write file " << e.what();
    }
  }

  void append(std::string_view data) override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    const void* bytes = data.data();
    unsigned long len = data.size();
    env->CallVoidMethod(obj_, jniWriteFileAppend, static_cast<jlong>(len), reinterpret_cast<jlong>(bytes));
    checkException(env);
  }

  void flush() override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    env->CallVoidMethod(obj_, jniWriteFileFlush);
    checkException(env);
  }

  void close() override {
    close0();
  }

  uint64_t size() const override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    jlong out = env->CallLongMethod(obj_, jniWriteFileSize);
    checkException(env);
    return static_cast<uint64_t>(out);
  }

 private:
  void close0() {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    env->CallVoidMethod(obj_, jniWriteFileClose);
    checkException(env);
  }

  jobject obj_;
};

// Convert "xxx:/a/b/c" to "/a/b/c". Probably it's Velox's job to remove the protocol when calling the member
// functions?
class FileSystemWrapper : public facebook::velox::filesystems::FileSystem {
 public:
  static std::shared_ptr<facebook::velox::filesystems::FileSystem> wrap(
      std::shared_ptr<facebook::velox::filesystems::FileSystem> fs) {
    return std::shared_ptr<facebook::velox::filesystems::FileSystem>(new FileSystemWrapper(fs));
  }

  std::string name() const override {
    return fs_->name();
  }

  std::unique_ptr<facebook::velox::ReadFile> openFileForRead(
      std::string_view path,
      const facebook::velox::filesystems::FileOptions& options) override {
    return fs_->openFileForRead(rewrite(path), options);
  }

  std::unique_ptr<facebook::velox::WriteFile> openFileForWrite(
      std::string_view path,
      const facebook::velox::filesystems::FileOptions& options) override {
    return fs_->openFileForWrite(rewrite(path), options);
  }

  void remove(std::string_view path) override {
    fs_->remove(rewrite(path));
  }

  void rename(std::string_view oldPath, std::string_view newPath, bool overwrite) override {
    fs_->rename(rewrite(oldPath), rewrite(newPath), overwrite);
  }

  bool exists(std::string_view path) override {
    return fs_->exists(rewrite(path));
  }

  std::vector<std::string> list(std::string_view path) override {
    return fs_->list(rewrite(path));
  }

  void mkdir(std::string_view path) override {
    fs_->mkdir(rewrite(path));
  }

  void rmdir(std::string_view path) override {
    fs_->rmdir(rewrite(path));
  }

 private:
  FileSystemWrapper(std::shared_ptr<facebook::velox::filesystems::FileSystem> fs) : FileSystem({}), fs_(fs) {}

  static std::string_view rewrite(std::string_view path) {
    return removePathSchema(path);
  }

  std::shared_ptr<facebook::velox::filesystems::FileSystem> fs_;
};

class JniFileSystem : public facebook::velox::filesystems::FileSystem {
 public:
  explicit JniFileSystem(jobject obj, std::shared_ptr<const facebook::velox::Config> config) : FileSystem(config) {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    obj_ = env->NewGlobalRef(obj);
    checkException(env);
  }

  ~JniFileSystem() override {
    try {
      JNIEnv* env;
      attachCurrentThreadAsDaemonOrThrow(vm, &env);
      env->DeleteGlobalRef(obj_);
      checkException(env);
    } catch (const std::exception& e) {
      LOG(WARNING) << "Error releasing jni file system " << e.what();
    }
  }

  std::string name() const override {
    return "JNI FS";
  }

  std::unique_ptr<facebook::velox::ReadFile> openFileForRead(
      std::string_view path,
      const facebook::velox::filesystems::FileOptions& options) override {
    GLUTEN_CHECK(
        options.values.empty(),
        "JniFileSystem::openFileForRead: file options is not empty, this is not currently supported");
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    jobject obj = env->CallObjectMethod(obj_, jniFileSystemOpenFileForRead, createJString(env, path));
    checkException(env);
    auto out = std::make_unique<JniReadFile>(obj);
    return out;
  }

  std::unique_ptr<facebook::velox::WriteFile> openFileForWrite(
      std::string_view path,
      const facebook::velox::filesystems::FileOptions& options) override {
    GLUTEN_CHECK(
        options.values.empty(),
        "JniFileSystem::openFileForWrite: file options is not empty, this is not currently supported");
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    jobject obj = env->CallObjectMethod(obj_, jniFileSystemOpenFileForWrite, createJString(env, path));
    checkException(env);
    auto out = std::make_unique<JniWriteFile>(obj);
    return out;
  }

  void remove(std::string_view path) override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    env->CallVoidMethod(obj_, jniFileSystemRemove, createJString(env, path));
    checkException(env);
  }

  void rename(std::string_view oldPath, std::string_view newPath, bool overwrite) override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    env->CallVoidMethod(obj_, jniFileSystemRename, createJString(env, oldPath), createJString(env, newPath), overwrite);
    checkException(env);
  }

  bool exists(std::string_view path) override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    bool out = env->CallBooleanMethod(obj_, jniFileSystemExists, createJString(env, path));
    checkException(env);
    return out;
  }

  std::vector<std::string> list(std::string_view path) override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    std::vector<std::string> out;
    jobjectArray jarray = (jobjectArray)env->CallObjectMethod(obj_, jniFileSystemList, createJString(env, path));
    checkException(env);
    jsize length = env->GetArrayLength(jarray);
    for (jsize i = 0; i < length; ++i) {
      jstring element = (jstring)env->GetObjectArrayElement(jarray, i);
      std::string cElement = jStringToCString(env, element);
      out.push_back(cElement);
    }
    return out;
  }

  void mkdir(std::string_view path) override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    env->CallVoidMethod(obj_, jniFileSystemMkdir, createJString(env, path));
    checkException(env);
  }

  void rmdir(std::string_view path) override {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    env->CallVoidMethod(obj_, jniFileSystemRmdir, createJString(env, path));
    checkException(env);
  }

  static bool isCapableForNewFile(uint64_t size) {
    JNIEnv* env;
    attachCurrentThreadAsDaemonOrThrow(vm, &env);
    bool out = env->CallStaticBooleanMethod(jniFileSystemClass, jniIsCapableForNewFile, static_cast<jlong>(size));
    checkException(env);
    return out;
  }

  static std::function<bool(std::string_view)> schemeMatcher() {
    return [](std::string_view filePath) { return filePath.find(kJniFsScheme) == 0; };
  }

  static std::function<std::shared_ptr<FileSystem>(std::shared_ptr<const facebook::velox::Config>, std::string_view)>
  fileSystemGenerator() {
    return [](std::shared_ptr<const facebook::velox::Config> properties, std::string_view filePath) {
      JNIEnv* env;
      attachCurrentThreadAsDaemonOrThrow(vm, &env);
      jobject obj = env->CallStaticObjectMethod(jniFileSystemClass, jniGetFileSystem);
      checkException(env);
      // remove "jni:" or "jol:" prefix.
      std::shared_ptr<FileSystem> lfs = FileSystemWrapper::wrap(std::make_shared<JniFileSystem>(obj, properties));
      return lfs;
    };
  }

 private:
  jobject obj_;
};
} // namespace

void gluten::initVeloxJniFileSystem(JNIEnv* env) {
  // vm
  if (env->GetJavaVM(&vm) != JNI_OK) {
    throw gluten::GlutenException("Unable to get JavaVM instance");
  }

  // classes
  jniFileSystemClass = createGlobalClassReferenceOrError(env, "Lio/glutenproject/fs/JniFilesystem;");
  jniReadFileClass = createGlobalClassReferenceOrError(env, "Lio/glutenproject/fs/JniFilesystem$ReadFile;");
  jniWriteFileClass = createGlobalClassReferenceOrError(env, "Lio/glutenproject/fs/JniFilesystem$WriteFile;");

  // methods in JniFilesystem
  jniGetFileSystem =
      getStaticMethodIdOrError(env, jniFileSystemClass, "getFileSystem", "()Lio/glutenproject/fs/JniFilesystem;");
  jniIsCapableForNewFile = getStaticMethodIdOrError(env, jniFileSystemClass, "isCapableForNewFile", "(J)Z");
  jniFileSystemOpenFileForRead = getMethodIdOrError(
      env, jniFileSystemClass, "openFileForRead", "(Ljava/lang/String;)Lio/glutenproject/fs/JniFilesystem$ReadFile;");
  jniFileSystemOpenFileForWrite = getMethodIdOrError(
      env, jniFileSystemClass, "openFileForWrite", "(Ljava/lang/String;)Lio/glutenproject/fs/JniFilesystem$WriteFile;");
  jniFileSystemRemove = getMethodIdOrError(env, jniFileSystemClass, "remove", "(Ljava/lang/String;)V");
  jniFileSystemRename =
      getMethodIdOrError(env, jniFileSystemClass, "rename", "(Ljava/lang/String;Ljava/lang/String;Z)V");
  jniFileSystemExists = getMethodIdOrError(env, jniFileSystemClass, "exists", "(Ljava/lang/String;)Z");
  jniFileSystemList = getMethodIdOrError(env, jniFileSystemClass, "list", "(Ljava/lang/String;)[Ljava/lang/String;");
  jniFileSystemMkdir = getMethodIdOrError(env, jniFileSystemClass, "mkdir", "(Ljava/lang/String;)V");
  jniFileSystemRmdir = getMethodIdOrError(env, jniFileSystemClass, "rmdir", "(Ljava/lang/String;)V");

  // methods in JniFilesystem$ReadFile
  jniReadFilePread = getMethodIdOrError(env, jniReadFileClass, "pread", "(JJJ)V");
  jniReadFileShouldCoalesce = getMethodIdOrError(env, jniReadFileClass, "shouldCoalesce", "()Z");
  jniReadFileSize = getMethodIdOrError(env, jniReadFileClass, "size", "()J");
  jniReadFileMemoryUsage = getMethodIdOrError(env, jniReadFileClass, "memoryUsage", "()J");
  jniReadFileGetNaturalReadSize = getMethodIdOrError(env, jniReadFileClass, "getNaturalReadSize", "()J");
  jniReadFileClose = getMethodIdOrError(env, jniReadFileClass, "close", "()V");

  // methods in JniFilesystem$WriteFile
  jniWriteFileAppend = getMethodIdOrError(env, jniWriteFileClass, "append", "(JJ)V");
  jniWriteFileFlush = getMethodIdOrError(env, jniWriteFileClass, "flush", "()V");
  jniWriteFileClose = getMethodIdOrError(env, jniWriteFileClass, "close", "()V");
  jniWriteFileSize = getMethodIdOrError(env, jniWriteFileClass, "size", "()J");
}

void gluten::finalizeVeloxJniFileSystem(JNIEnv* env) {
  env->DeleteGlobalRef(jniWriteFileClass);
  env->DeleteGlobalRef(jniReadFileClass);
  env->DeleteGlobalRef(jniFileSystemClass);

  vm = nullptr;
}

// "jol" stands for letting Gluten choose between jni fs and local fs.
// This doesn't implement facebook::velox::filesystems::FileSystem since it just
// act as a entry-side router to create JniFilesystem and LocalFilesystem
void gluten::registerJolFileSystem(uint64_t maxFileSize) {
  GLUTEN_CHECK(maxFileSize > 0, "Unexpected max file size for jol fs: " + std::to_string(maxFileSize));

  auto JolSchemeMatcher = [](std::string_view filePath) { return filePath.find(kJolFsScheme) == 0; };

  auto fileSystemGenerator =
      [maxFileSize](
          std::shared_ptr<const facebook::velox::Config> properties,
          std::string_view filePath) -> std::shared_ptr<facebook::velox::filesystems::FileSystem> {
    // select JNI file if there is enough space
    if (JniFileSystem::isCapableForNewFile(maxFileSize)) {
      return JniFileSystem::fileSystemGenerator()(properties, filePath);
    }

    // otherwise select local file
    // remove "jol:" to make Velox choose local fs.
    auto localFilePath = removePathSchema(filePath);
    auto fs = FileSystemWrapper::wrap(facebook::velox::filesystems::getFileSystem(localFilePath, properties));
    return fs;
  };

  facebook::velox::filesystems::registerFileSystem(JolSchemeMatcher, fileSystemGenerator);
}
