clients/filesystem-fuse/src/gravitino_fileset_filesystem.rs (224 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::config::AppConfig;
use crate::filesystem::{FileStat, FileSystemCapacity, FileSystemContext, PathFileSystem, Result};
use crate::gravitino_client::GravitinoClient;
use crate::opened_file::{OpenFileFlags, OpenedFile};
use async_trait::async_trait;
use fuse3::{Errno, FileType};
use std::path::{Path, PathBuf};
/// GravitinoFileSystem is a filesystem that is associated with a fileset in Gravitino.
/// It mapping the fileset path to the original data storage path. and delegate the operation
/// to the inner filesystem like S3 GCS, JuiceFS.
pub(crate) struct GravitinoFilesetFileSystem {
physical_fs: Box<dyn PathFileSystem>,
client: GravitinoClient,
// location is a absolute path in the physical filesystem that is associated with the fileset.
// e.g. fileset location : s3://bucket/path/to/file the location is /path/to/file
location: PathBuf,
}
impl GravitinoFilesetFileSystem {
pub async fn new(
fs: Box<dyn PathFileSystem>,
target_path: &Path,
client: GravitinoClient,
_config: &AppConfig,
_context: &FileSystemContext,
) -> Self {
Self {
physical_fs: fs,
client: client,
location: target_path.into(),
}
}
fn gvfs_path_to_raw_path(&self, path: &Path) -> PathBuf {
let relation_path = path.strip_prefix("/").expect("path should start with /");
if relation_path == Path::new("") {
return self.location.clone();
}
self.location.join(relation_path)
}
fn raw_path_to_gvfs_path(&self, path: &Path) -> Result<PathBuf> {
let stripped_path = path
.strip_prefix(&self.location)
.map_err(|_| Errno::from(libc::EBADF))?;
let mut result_path = PathBuf::from("/");
result_path.push(stripped_path);
Ok(result_path)
}
}
#[async_trait]
impl PathFileSystem for GravitinoFilesetFileSystem {
async fn init(&self) -> Result<()> {
self.physical_fs.init().await
}
async fn stat(&self, path: &Path, kind: FileType) -> Result<FileStat> {
let raw_path = self.gvfs_path_to_raw_path(path);
let mut file_stat = self.physical_fs.stat(&raw_path, kind).await?;
file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?;
Ok(file_stat)
}
async fn lookup(&self, path: &Path) -> Result<FileStat> {
let raw_path = self.gvfs_path_to_raw_path(path);
let mut file_stat = self.physical_fs.lookup(&raw_path).await?;
file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?;
Ok(file_stat)
}
async fn read_dir(&self, path: &Path) -> Result<Vec<FileStat>> {
let raw_path = self.gvfs_path_to_raw_path(path);
let mut child_filestats = self.physical_fs.read_dir(&raw_path).await?;
for file_stat in child_filestats.iter_mut() {
file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?;
}
Ok(child_filestats)
}
async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> Result<OpenedFile> {
let raw_path = self.gvfs_path_to_raw_path(path);
let mut opened_file = self.physical_fs.open_file(&raw_path, flags).await?;
opened_file.file_stat.path = self.raw_path_to_gvfs_path(&opened_file.file_stat.path)?;
Ok(opened_file)
}
async fn open_dir(&self, path: &Path, flags: OpenFileFlags) -> Result<OpenedFile> {
let raw_path = self.gvfs_path_to_raw_path(path);
let mut opened_file = self.physical_fs.open_dir(&raw_path, flags).await?;
opened_file.file_stat.path = self.raw_path_to_gvfs_path(&opened_file.file_stat.path)?;
Ok(opened_file)
}
async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result<OpenedFile> {
let raw_path = self.gvfs_path_to_raw_path(path);
let mut opened_file = self.physical_fs.create_file(&raw_path, flags).await?;
opened_file.file_stat.path = self.raw_path_to_gvfs_path(&opened_file.file_stat.path)?;
Ok(opened_file)
}
async fn create_dir(&self, path: &Path) -> Result<FileStat> {
let raw_path = self.gvfs_path_to_raw_path(path);
let mut file_stat = self.physical_fs.create_dir(&raw_path).await?;
file_stat.path = self.raw_path_to_gvfs_path(&file_stat.path)?;
Ok(file_stat)
}
async fn set_attr(&self, path: &Path, file_stat: &FileStat, flush: bool) -> Result<()> {
let raw_path = self.gvfs_path_to_raw_path(path);
self.physical_fs.set_attr(&raw_path, file_stat, flush).await
}
async fn remove_file(&self, path: &Path) -> Result<()> {
let raw_path = self.gvfs_path_to_raw_path(path);
self.physical_fs.remove_file(&raw_path).await
}
async fn remove_dir(&self, path: &Path) -> Result<()> {
let raw_path = self.gvfs_path_to_raw_path(path);
self.physical_fs.remove_dir(&raw_path).await
}
fn get_capacity(&self) -> Result<FileSystemCapacity> {
self.physical_fs.get_capacity()
}
}
#[cfg(test)]
mod tests {
use crate::config::{AppConfig, GravitinoConfig};
use crate::default_raw_filesystem::DefaultRawFileSystem;
use crate::filesystem::tests::{TestPathFileSystem, TestRawFileSystem};
use crate::filesystem::{FileSystemContext, PathFileSystem, RawFileSystem};
use crate::gravitino_client::tests::{create_test_catalog, create_test_fileset};
use crate::gravitino_client::GravitinoClient;
use crate::gravitino_fileset_filesystem::GravitinoFilesetFileSystem;
use crate::gvfs_creator::create_fs_with_fileset;
use crate::memory_filesystem::MemoryFileSystem;
use crate::s3_filesystem::extract_s3_config;
use crate::s3_filesystem::tests::{cleanup_s3_fs, s3_test_config};
use crate::test_enable_with;
use crate::RUN_TEST_WITH_S3;
use std::collections::HashMap;
use std::path::Path;
#[tokio::test]
async fn test_map_fileset_path_to_raw_path() {
let fs = GravitinoFilesetFileSystem {
physical_fs: Box::new(MemoryFileSystem::new().await),
client: GravitinoClient::new(&GravitinoConfig::default()),
location: "/c1/fileset1".into(),
};
let path = fs.gvfs_path_to_raw_path(Path::new("/a"));
assert_eq!(path, Path::new("/c1/fileset1/a"));
let path = fs.gvfs_path_to_raw_path(Path::new("/"));
assert_eq!(path, Path::new("/c1/fileset1"));
}
#[tokio::test]
async fn test_map_raw_path_to_fileset_path() {
let fs = GravitinoFilesetFileSystem {
physical_fs: Box::new(MemoryFileSystem::new().await),
client: GravitinoClient::new(&GravitinoConfig::default()),
location: "/c1/fileset1".into(),
};
let path = fs
.raw_path_to_gvfs_path(Path::new("/c1/fileset1/a"))
.unwrap();
assert_eq!(path, Path::new("/a"));
let path = fs.raw_path_to_gvfs_path(Path::new("/c1/fileset1")).unwrap();
assert_eq!(path, Path::new("/"));
}
async fn create_fileset_fs(path: &Path, config: &AppConfig) -> GravitinoFilesetFileSystem {
let opendal_config = extract_s3_config(config);
cleanup_s3_fs(path, &opendal_config).await;
let bucket = opendal_config.get("bucket").expect("Bucket must exist");
let endpoint = opendal_config.get("endpoint").expect("Endpoint must exist");
let catalog = create_test_catalog(
"c1",
"s3",
vec![
("location".to_string(), format!("s3a://{}", bucket)),
("s3-endpoint".to_string(), endpoint.to_string()),
]
.into_iter()
.collect::<HashMap<String, String>>(),
);
let file_set_location = format!("s3a://{}{}", bucket, path.to_string_lossy());
let file_set = create_test_fileset("fileset1", &file_set_location);
let fs_context = FileSystemContext::default();
let inner_fs = create_fs_with_fileset(&catalog, &file_set, config, &fs_context)
.await
.unwrap();
GravitinoFilesetFileSystem::new(
inner_fs,
path,
GravitinoClient::new(&config.gravitino),
config,
&fs_context,
)
.await
}
#[tokio::test]
async fn s3_ut_test_fileset_file_system() {
test_enable_with!(RUN_TEST_WITH_S3);
let config = s3_test_config();
let cwd = Path::new("/gvfs_test3");
let fs = create_fileset_fs(cwd, &config).await;
let _ = fs.init().await;
let mut tester = TestPathFileSystem::new(Path::new("/"), fs);
tester.test_path_file_system().await;
}
#[tokio::test]
async fn s3_ut_test_fileset_with_raw_file_system() {
test_enable_with!(RUN_TEST_WITH_S3);
let config = s3_test_config();
let cwd = Path::new("/gvfs_test4");
let fileset_fs = create_fileset_fs(cwd, &config).await;
let raw_fs = DefaultRawFileSystem::new(
fileset_fs,
&AppConfig::default(),
&FileSystemContext::default(),
);
let _ = raw_fs.init().await;
let mut tester = TestRawFileSystem::new(Path::new("/"), raw_fs);
tester.test_raw_file_system().await;
}
}