src/fs.rs (930 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use fuser::{ FileAttr, FileType, Filesystem, KernelConfig, ReplyAttr, ReplyCreate, ReplyData, ReplyDirectory, ReplyEmpty, ReplyEntry, ReplyWrite, Request, }; use libc::{EIO, ENOENT, ENOTDIR}; use std::collections::HashMap; use std::ffi::OsStr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use crate::gcs::{Object, ResumableUploadCursor}; use crate::http::GcsHttpClient; pub type Inode = u64; // TODO(boulos): What's a reasonable TTL? Since we're focused on // read-only, let's set at least 30s. Amusingly, Free BSD now treats // *all* numbers > 0 as "cache forever" (which is probably what we // want, with invalidation). Capitalizing the S for seconds would be // confusing, so disabling that warning. #[allow(non_upper_case_globals)] const TTL_30s: Duration = Duration::from_secs(30); // It's not clear if anything cares about this. const HARDCODED_BLOCKSIZE: u32 = 512; struct PsuedoDir { name: String, // inline entries: Vec<(String, Inode)>, } // TODO(boulos): Decide if we just use BTreeMap rather than HashMaps and Vecs... pub struct GCSFS { // Inode => Attr inode_to_attr: RwLock<HashMap<Inode, FileAttr>>, // From inode to raw GCS Object. inode_to_obj: RwLock<HashMap<Inode, Object>>, // From inode => the path name directory_map: RwLock<HashMap<Inode, PsuedoDir>>, // We probably need some way to implement forget, but whatever. inode_counter: Mutex<Inode>, // So we can refer into the file handle map. fh_counter: AtomicU64, file_handles: RwLock<HashMap<u64, ResumableUploadCursor>>, // GCS configuration gcs_bucket: String, gcs_prefix: Option<String>, // Persistent client gcs_client: GcsHttpClient, // And our runtime for waiting out async. tokio_rt: tokio::runtime::Runtime, } impl GCSFS { pub fn new(bucket: String, prefix: Option<String>) -> Self { info!( "Making a GCSFS for Bucket {} w/ Prefix {:#?}!", bucket, prefix ); let result = GCSFS { inode_to_attr: RwLock::new(HashMap::new()), inode_to_obj: RwLock::new(HashMap::new()), directory_map: RwLock::new(HashMap::new()), inode_counter: Mutex::new(0), // Start at 1, so we can hand out fh=0 as "no fh" fh_counter: AtomicU64::new(1), file_handles: RwLock::new(HashMap::new()), gcs_bucket: bucket, gcs_prefix: prefix, gcs_client: super::http::new_client(), tokio_rt: tokio::runtime::Runtime::new().unwrap(), }; // Load the bucket from the prefix. let root_inode = result.load_dir(result.gcs_prefix.clone(), None); debug!("root inode => {}", root_inode); // Now we're ready to be started as a Filesystem. result } fn get_inode(&self) -> Inode { // Grab an inode. We pre-increment so that the root inode gets 1. let mut data = self.inode_counter.lock().unwrap(); *data += 1; *data } fn make_fh(&self, cursor: ResumableUploadCursor) -> u64 { let fh = self.fh_counter.fetch_add(1, Ordering::SeqCst); // Put the cursor into our hash map. self.file_handles.write().unwrap().insert(fh, cursor); // return the handle. fh } #[allow(dead_code)] // I'm not ready to use this, but I'm going to want it. fn drop_fh(&self, fh: u64) { let _ = self.file_handles.write().unwrap().remove(&fh); } fn load_file(&self, _full_path: String, obj: Object) -> Inode { let inode = self.get_inode(); // NOTE(boulos): This is pretty noisy on boot and env_logger doesn't seem to have // some sort of "super noisy debug". So comment in/out if you need to debugg the // loading process. //debug!(" GCSFS. Loading {}", _full_path); let mtime: SystemTime = obj.updated.into(); let ctime: SystemTime = obj.time_created.into(); // GCS doesn't have atime, use mtime let atime = mtime; let file_attr: FileAttr = FileAttr { ino: inode, size: obj.size, blocks: 1, /* grr. obj.size / blksize? */ atime, mtime, ctime, crtime: ctime, kind: FileType::RegularFile, perm: 0o755, /* Mark everything as 755 */ nlink: 1, uid: 501, gid: 20, rdev: 0, flags: 0, blksize: HARDCODED_BLOCKSIZE, }; self.inode_to_attr.write().unwrap().insert(inode, file_attr); self.inode_to_obj.write().unwrap().insert(inode, obj); inode } // Given a bucket, and a prefix (the directory), load it fn load_dir(&self, prefix: Option<String>, parent: Option<Inode>) -> Inode { let bucket_clone = self.gcs_bucket.clone(); let prefix_clone = prefix.clone(); let prefix_for_load: String = match prefix { Some(prefix_str) => prefix_str, None => String::from(""), }; // As above (this is too noisy for debugging regular FS operation). //debug!(" GCSFS. DIR {}", prefix_for_load); // Always use / as delim. let (single_level_objs, subdirs) = self .tokio_rt .block_on(async { super::gcs::list_objects( &self.gcs_client, bucket_clone.as_ref(), prefix_clone.as_deref(), Some("/"), ) .await }) .unwrap(); let dir_inode = self.get_inode(); let dir_time: SystemTime = UNIX_EPOCH + Duration::new(1534812086, 0); // 2018-08-20 15:41 Pacific let dir_attr: FileAttr = FileAttr { ino: dir_inode, size: 0, blocks: 0, atime: dir_time, mtime: dir_time, ctime: dir_time, crtime: dir_time, kind: FileType::Directory, perm: 0o755, nlink: (single_level_objs.len() + 2) as u32, uid: 501, gid: 20, rdev: 0, flags: 0, blksize: HARDCODED_BLOCKSIZE, }; self.inode_to_attr .write() .unwrap() .insert(dir_inode, dir_attr); let parent_inode = match parent { Some(parent_val) => parent_val, None => dir_inode, }; let mut dir_entries: Vec<(String, Inode)> = vec![ (String::from("."), dir_inode), /* self link */ (String::from(".."), parent_inode), ]; // GCS returns paths relative to the root of the bucket for // obj.name. Strip off the prefix to get the "filename". let base_dir_index = prefix_for_load.len(); // Load the subdirectories in parallel, gathering up their results in order. let subdir_len = subdirs.len(); let inodes: Arc<RwLock<Vec<Inode>>> = Arc::new(RwLock::new(Vec::with_capacity(subdir_len))); // Pre-fill the array with 0s, so we can write into each slot blindly later. inodes .write() .unwrap() .resize_with(subdir_len, Default::default); rayon::scope(|s| { // NOTE(boulos): We have to do this so that the move // closure below doesn't capture the real self / we // indicate that its lifetime matches that of this Rayon // scope. let shadow_self = &self; // Loop over all the subdirs, recursively loading them. for (i, dir) in subdirs.iter().enumerate() { let inodes_clone = Arc::clone(&inodes); s.spawn(move |_| { let inode = shadow_self.load_dir(Some(dir.to_string()), Some(dir_inode)); let mut write_context = inodes_clone.write().unwrap(); if let Some(elem) = write_context.get_mut(i) { *elem = inode; } else { println!( "ERROR: Tried to write inode '{}' to index {} \ for directory {} (subdirs has len {})", inode, i, dir, subdir_len ); } }); } }); for (i, dir) in subdirs.iter().enumerate() { // To insert the "directory name", we get the basedir and // strip the trailing slash. let last_slash = dir.len() - 1; let dir_str = dir[base_dir_index..last_slash].to_string(); let read_context = inodes.read().unwrap(); dir_entries.push((dir_str, read_context[i])); } // Loop over all the direct objects, adding them to our maps for obj in single_level_objs { // Extract just the portion that is the "file name". let file_str = obj.name[base_dir_index..].to_string(); let full_path = format!("{}{}", prefix_for_load, file_str); let inode = self.load_file(full_path, obj); dir_entries.push((file_str, inode)); } //debug!(" Created dir_entries: {:#?}", dir_entries); self.directory_map.write().unwrap().insert( dir_inode, PsuedoDir { name: prefix_for_load, entries: dir_entries, }, ); dir_inode } } impl Filesystem for GCSFS { fn init(&mut self, _req: &Request, config: &mut KernelConfig) -> Result<(), i32> { debug!("Kernelconfig is {:#?}", config); Ok(()) } fn lookup(&mut self, _req: &Request, parent: Inode, name: &OsStr, reply: ReplyEntry) { debug!("lookup(parent={}, name={})", parent, name.to_str().unwrap()); if let Some(dir_ent) = self.directory_map.read().unwrap().get(&parent) { // TODO(boulos): Is this the full name, or just the portion? (I believe just portion) let search_name = name.to_str().unwrap().to_string(); for child_pair in dir_ent.entries.iter() { trace!( " Is search target '{}' == dir_entry '{}'?", search_name, child_pair.0 ); if child_pair.0 == search_name { if let Some(attr) = self.inode_to_attr.read().unwrap().get(&child_pair.1) { // Found it! Return the info for the inode. debug!( " Found it! search target '{}' is inode {}", child_pair.0, child_pair.1 ); reply.entry(&TTL_30s, &attr, 0); return; } } } debug!(" Never found '{}'", search_name); } reply.error(ENOENT); } fn getattr(&mut self, _req: &Request, inode: Inode, reply: ReplyAttr) { debug!("Trying to getattr() on inode {}", inode); if let Some(attr) = self.inode_to_attr.read().unwrap().get(&inode) { reply.attr(&TTL_30s, &attr); } else { reply.error(ENOENT); } } fn read( &mut self, _req: &Request, inode: Inode, _fh: u64, offset: i64, _size: u32, _flags: i32, _lock_owner: Option<u64>, reply: ReplyData, ) { debug!( "Trying to read() {} on {} at offset {}", _size, inode, offset ); if let Some(obj) = self.inode_to_obj.read().unwrap().get(&inode) { debug!(" Performing read for obj: {:#?}", obj); let result = self.tokio_rt.block_on(async { super::gcs::get_bytes_with_client( &self.gcs_client, obj, offset as u64, _size as u64, ) .await }); match result { Ok(bytes) => { reply.data(&bytes); } Err(e) => { debug!(" get_bytes failed. Error {:#?}", e); reply.error(EIO); } } } else { debug!(" failed to find the object for inode {}", inode); reply.error(ENOENT); } } fn readdir( &mut self, _req: &Request, inode: Inode, _fh: u64, offset: i64, mut reply: ReplyDirectory, ) { debug!("Trying to readdir on {} with offset {}", inode, offset); if let Some(dir_ent) = self.directory_map.read().unwrap().get(&inode) { debug!( " directory {} has {} entries ({:#?})", inode, dir_ent.entries.len(), dir_ent.entries ); let mut absolute_index = offset + 1; for (idx, ref child_pair) in dir_ent.entries.iter().skip(offset as usize).enumerate() { debug!( " looking at entry {}, got back pair {:#?}", idx, child_pair ); if let Some(child_ent) = self.inode_to_attr.read().unwrap().get(&child_pair.1) { debug!( " readdir for inode {}, adding '{}' as inode {}", inode, child_pair.0, child_pair.1 ); if reply.add( child_pair.1, absolute_index as i64, child_ent.kind, &child_pair.0, ) { // We've filled up our reply buffer. Exit. break; } absolute_index += 1; } else { debug!(" readdir for inode {}, could not find inode {} which was given in dir_ent as '{}'", inode, child_pair.1, child_pair.0); reply.error(ENOENT); return; } } reply.ok(); } else { reply.error(ENOENT); } } fn create( &mut self, req: &Request<'_>, parent: u64, name: &OsStr, mode: u32, _umask: u32, flags: i32, reply: ReplyCreate, ) { debug!( "create(parent_dir = {}, path = {:#?}, mode = {}, flags = {:o})", parent, name, mode, flags ); let file_type = (mode as libc::mode_t) & libc::S_IFMT; if file_type != libc::S_IFREG && file_type != libc::S_IFDIR { warn!( "create called for file_type {}. But we only handle FILE/DIR", file_type ); reply.error(libc::EINVAL); return; } // Grab a scoped lock for the directory map (we'll update the directory) let mut dir_map_lock = self.directory_map.write().unwrap(); // Find the parent in the directory map. let parent_ent = dir_map_lock.get_mut(&parent); // NOTE(boulos): I think FUSE does this check already. if parent_ent.is_none() { debug!(" -- warning/error:fuse_create called w/o parent directory"); reply.error(ENOTDIR); return; } let parent_dir = parent_ent.unwrap(); let dir_entries = &mut parent_dir.entries; let search_name = name.to_str().unwrap().to_string(); let full_name = match parent_dir.name.len() { // Don't include the leading / for the root directory. 0 => search_name.clone(), _ => format!("{}/{}", parent_dir.name, search_name), }; #[cfg(debug)] // FUSE isn't supposed to call this for existing entries. Double check in debug mode. for child_pair in dir_entries.iter() { if child_pair.0 == search_name { debug!(" -- warning/error: File {} already exists!", search_name); reply.error(EEXIST); return; } } // Make a new inode for our new file or directory. let inode = self.get_inode(); let now = SystemTime::now(); let kind = match file_type { libc::S_IFREG => FileType::RegularFile, libc::S_IFDIR => FileType::Directory, _ => unreachable!(), }; // If it's going to be a regular file, try to initiate the // Upload. If this fails, we've burned an inode, but whatever. let fh = match kind { FileType::RegularFile => { let create_result = self.tokio_rt.block_on(async { super::gcs::create_object_with_client( &self.gcs_client, &self.gcs_bucket, &full_name, ) .await }); if create_result.is_err() { // We failed (and warned internally). Bubble up an EIO. reply.error(libc::EIO); return; } // Make ourselves a file handle! self.make_fh(create_result.unwrap()) } // Directories don't need FHs for now. _ => 0, }; let attrs: FileAttr = FileAttr { ino: inode, size: 0, blocks: 0, atime: now, mtime: now, ctime: now, crtime: now, kind, perm: 0o755, /* We could use mode, but whatever */ nlink: match kind { FileType::RegularFile => 1, FileType::Directory => 2, _ => unreachable!(), }, uid: req.uid(), gid: req.gid(), rdev: 0, flags: 0, blksize: HARDCODED_BLOCKSIZE, }; // Put the node into our inode map self.inode_to_attr.write().unwrap().insert(inode, attrs); // Put the node into our parent directory listing. dir_entries.push((search_name.clone(), inode)); if kind == FileType::Directory { // Make our own PsuedoDir let sub_entries: Vec<(String, Inode)> = vec![ (String::from("."), inode), /* Self link */ (String::from(".."), parent as Inode), ]; dir_map_lock.insert( inode, PsuedoDir { name: search_name, entries: sub_entries, }, ); } else { // Otherwise, maybe prepare a stub Object? } reply.created(&TTL_30s, &attrs, 0, fh, 0); } fn write( &mut self, _req: &Request<'_>, inode: u64, fh: u64, offset: i64, data: &[u8], _write_flags: u32, _flags: i32, _lock_owner: Option<u64>, reply: ReplyWrite, ) { let now = SystemTime::now(); debug!( "Got a write request. inode={}, fh={}, offset={}, size={}", inode, fh, offset, data.len() ); let fh_clone = fh; let mut fh_map = self.file_handles.write().unwrap(); let cursor_or_none = fh_map.get_mut(&fh_clone); if cursor_or_none.is_none() { error!("write(): didn't find the fh {}", fh); reply.error(libc::EBADF); return; } let inode_clone = inode; let mut attr_map = self.inode_to_attr.write().unwrap(); let attr_or_none = attr_map.get_mut(&inode_clone); if attr_or_none.is_none() { error!("write(): Didn't find inode {}", inode); reply.error(libc::EBADF); return; } let cursor = cursor_or_none.unwrap(); let remaining = cursor.buffer.capacity() - cursor.buffer.len(); let cursor_position = cursor.offset + (cursor.buffer.len() as u64); debug!( "cursor has URI {}, offset {}, 'position' {}, and remaining {}", cursor.session_uri, cursor.offset, cursor_position, remaining ); // Only allow writing at the offset. if (offset as u64) != cursor_position { error!( "Got a write for offset {}. But cursor is at {} (we only support appending)", offset, cursor_position ); reply.error(libc::EINVAL); return; } let result = self.tokio_rt.block_on(async { super::gcs::append_bytes_with_client(&self.gcs_client, cursor, data).await }); if result.is_err() { reply.error(libc::EIO); return; } let len = result.unwrap() as u64; // We wrote the bytes! Update our attrs. reply.written(len as u32); let mut attr = attr_or_none.unwrap(); // Update our atime/mtime. attr.atime = now; attr.mtime = now; // Update the bytes written. attr.size += len; } // NOTE(boulos): Despite the name 'flush', this is called on *close*. fn flush( &mut self, _req: &Request<'_>, inode: u64, fh: u64, _lock_owner: u64, reply: ReplyEmpty, ) { debug!("flush called for inode {} / fh {}", inode, fh); if fh == 0 { // Ignore fh 0 reply.ok(); return; } let now = SystemTime::now(); let fh_clone = fh; let mut fh_map = self.file_handles.write().unwrap(); let cursor_or_none = fh_map.get_mut(&fh_clone); if cursor_or_none.is_none() { error!("flush(): didn't find the fh {}", fh); reply.error(libc::EBADF); return; } let inode_clone = inode; let mut attr_map = self.inode_to_attr.write().unwrap(); let attr_or_none = attr_map.get_mut(&inode_clone); if attr_or_none.is_none() { error!("flush(): Didn't find inode {}", inode); reply.error(libc::EBADF); return; } let cursor = cursor_or_none.unwrap(); let result = self.tokio_rt.block_on(async { super::gcs::finalize_upload_with_client(&self.gcs_client, cursor).await }); if result.is_err() { error!("finalze upload failed with err {:#?}", result); reply.error(libc::EIO); return; } // We've got an Object now! Update our FileAttr and map. let obj: Object = result.unwrap(); debug!("Got back our object! {:#?}", obj); let mut attr = attr_or_none.unwrap(); // Update our atime/mtime. attr.atime = now; attr.mtime = now; // Finalize the object size. attr.size = obj.size; // Put the Object into our map (allownig reads!). self.inode_to_obj.write().unwrap().insert(inode, obj); reply.ok(); } } #[cfg(test)] mod tests { extern crate env_logger; extern crate tempfile; use super::*; use std; use std::fs; use std::io::{Read, Write}; use std::path::PathBuf; use std::process::Command; use tempfile::NamedTempFile; const LANDSAT_SUBDIR: &str = "LC08_L1GT_044034_20130330_20170310_01_T2"; const LANDSAT_B7_TIF: &str = "LC08_L1GT_044034_20130330_20170310_01_T2_B7.TIF"; const LANDSAT_B7_MTL: &str = "LC08_L1GT_044034_20130330_20170310_01_T2_MTL.txt"; fn init() { // https://docs.rs/env_logger/0.8.2/env_logger/index.html#capturing-logs-in-tests let _ = env_logger::builder().is_test(true).try_init(); } fn run_ls(cwd: &str) { info!("about to run ls -lFGa in cwd: {}", cwd); let output = Command::new("ls") .arg("-l") .arg("-a") .arg("-F") .arg("-G") .current_dir(cwd) .output() .expect("ls failed"); info!("status: {}", output.status); info!("stdout: {}", String::from_utf8_lossy(&output.stdout)); info!("stderr: {}", String::from_utf8_lossy(&output.stderr)); assert!(output.status.success()); } fn run_cp(src_path: &str, dst_path: &str, cwd: &str) { info!("about to run cp {} {} in cwd: {}", src_path, dst_path, cwd); let now = std::time::Instant::now(); let output = Command::new("cp") .arg(src_path) .arg(dst_path) .current_dir(cwd) .output() .expect("cp failed"); info!("status: {}", output.status); info!("stdout: {}", String::from_utf8_lossy(&output.stdout)); info!("stderr: {}", String::from_utf8_lossy(&output.stderr)); info!("inside run_cp: {:#?}", now.elapsed()); assert!(output.status.success()); } fn run_dd(src_path: &str, dst_path: &str, blksize_bytes: usize, cwd: &str) { info!( "about to run dd if={} of={} bs={} in cwd: {}", src_path, dst_path, blksize_bytes, cwd ); let now = std::time::Instant::now(); let output = Command::new("dd") .arg(format!("bs={}", blksize_bytes)) .arg(format!("if={}", src_path)) .arg(format!("of={}", dst_path)) .current_dir(cwd) .output() .expect("dd failed"); info!("status: {}", output.status); info!("stdout: {}", String::from_utf8_lossy(&output.stdout)); info!("stderr: {}", String::from_utf8_lossy(&output.stderr)); info!("inside run_dd: {:#?}", now.elapsed()); assert!(output.status.success()); } fn run_stat(path: &str, cwd: &str) { info!("about to run stat {} in cwd: {}", path, cwd); let now = std::time::Instant::now(); let output = Command::new("stat") .arg(path) .current_dir(cwd) .output() .expect("stat failed"); info!("status: {}", output.status); info!("stdout: {}", String::from_utf8_lossy(&output.stdout)); info!("stderr: {}", String::from_utf8_lossy(&output.stderr)); info!("inside run_cp: {:#?}", now.elapsed()); assert!(output.status.success()); } pub fn mount_bucket<'a>( bucket: String, prefix: Option<String>, mountpoint: String, read_only: bool, ) -> fuser::BackgroundSession { let fs = GCSFS::new(bucket, prefix); // I can't figure out an easy way to have a ro vs rw "let mode // =" that doesn't result in a confused borrow/free // outcome. So push the -o <ro/rw> and then add on the other // ones. let mut options: Vec<&OsStr> = Vec::new(); options.push("-o".as_ref()); if read_only { options.push("ro".as_ref()); } else { options.push("rw".as_ref()); } options.extend( [ "-o", "auto_unmount", "-o", "noatime", "-o", "fsname=gcsfuser", "-o", "noappledouble", /* Disable ._. and .DS_Store files */ ] .iter() .map(|o| o.as_ref()) .collect::<Vec<&OsStr>>(), ); info!( "Attempting to mount gcsfs @ {} with {:#?}", mountpoint, options ); fuser::spawn_mount(fs, &mountpoint, &options).unwrap() } pub fn mount_tempdir_ro<'a>(mountpoint: PathBuf) -> fuser::BackgroundSession { let bucket = "gcp-public-data-landsat"; // One level up to test subdir loading. let prefix = "LC08/01/044/034/"; mount_bucket( bucket.to_string(), Some(prefix.to_string()), mountpoint.to_str().unwrap().to_string(), true, ) } pub fn mount_tempdir_rw<'a>(mountpoint: PathBuf) -> fuser::BackgroundSession { let bucket = std::env::var("GCSFUSER_TEST_BUCKET").expect("You must provide a read/write bucket"); mount_bucket( bucket, None, mountpoint.to_str().unwrap().to_string(), false, ) } #[test] fn just_mount<'a>() { init(); let dir = tempfile::Builder::new() .prefix("just_mount") .tempdir() .unwrap(); let mnt = dir.into_path(); let mnt_str = String::from(mnt.to_str().unwrap()); let daemon = mount_tempdir_ro(mnt); info!("mounted fs at {} in thread {:#?}", mnt_str, daemon); } #[test] fn mount_and_read<'a>() { init(); let dir = tempfile::Builder::new() .prefix("mount_and_read") .tempdir() .unwrap(); let mnt = dir.into_path(); let mnt_str = String::from(mnt.to_str().unwrap()); let daemon = mount_tempdir_ro(mnt); info!("mounted fs at {} in thread {:#?}", mnt_str, daemon); let to_open = format!("{}/{}/{}", mnt_str, LANDSAT_SUBDIR, LANDSAT_B7_MTL); info!("Try to open '{}'", to_open); let result = fs::read_to_string(to_open).unwrap(); info!(" got back {}", result); drop(daemon); } #[test] fn large_read<'a>() { init(); let dir = tempfile::Builder::new() .prefix("large_read") .tempdir() .unwrap(); let mnt = dir.into_path(); let mnt_str = String::from(mnt.to_str().unwrap()); let daemon = mount_tempdir_ro(mnt); info!("mounted fs at {} in thread {:#?}", mnt_str, daemon); let tif_file = LANDSAT_B7_TIF; let sub_dir = LANDSAT_SUBDIR; let to_open = format!("{}/{}/{}", mnt_str, sub_dir, tif_file); info!("Try to open '{}'", to_open); let mut fh = std::fs::OpenOptions::new() .read(true) .open(to_open) .expect("Failed to open file"); let mut buffer = [0; 1024 * 1024]; info!("About to read 1MB from {:#?}", fh); let result = fh.read(&mut buffer); info!(" got back {:#?}", result); drop(daemon); } // macOS doesn't have O_DIRECT #[cfg(not(target_os = "macos"))] #[test] fn direct_read<'a>() { init(); let dir = tempfile::Builder::new() .prefix("direct_read") .tempdir() .unwrap(); let mnt = dir.into_path(); let mnt_str = String::from(mnt.to_str().unwrap()); let daemon = mount_tempdir_ro(mnt); info!("mounted fs at {} in thread {:#?}", mnt_str, daemon); let tif_file = LANDSAT_B7_TIF; let sub_dir = LANDSAT_SUBDIR; let to_open = format!("{}/{}/{}", mnt_str, sub_dir, tif_file); info!("Try to open '{}'", to_open); use std::os::unix::fs::OpenOptionsExt; let mut fh = std::fs::OpenOptions::new() .read(true) .custom_flags(libc::O_DIRECT) .open(to_open) .expect("Failed to open file"); let mut buffer = [0; 1024 * 1024]; info!("About to read 1MB from {:#?}", fh); let result = fh.read(&mut buffer); info!(" got back {:#?}", result); drop(daemon); } #[test] fn small_write<'a>() { init(); let dir = tempfile::Builder::new() .prefix("mount_and_write") .tempdir() .unwrap(); let mnt = dir.into_path(); let mnt_str = String::from(mnt.to_str().unwrap()); let daemon = mount_tempdir_rw(mnt); info!("mounted fs at {} in thread {:#?}", mnt_str, daemon); let mut tmp_file = NamedTempFile::new_in(mnt_str).unwrap(); info!("Opened '{:#?}'", tmp_file.path()); let txt_file = tmp_file.as_file_mut(); let write_result = txt_file.write_all(b"My first words!"); info!(" got back {:#?}", write_result); assert!(write_result.is_ok()); // sync the file to see if we have any other errors. let sync_result = txt_file.sync_all(); info!(" sync result => {:#?}", sync_result); assert!(sync_result.is_ok()); // drop the file to close it. drop(txt_file); info!("Sleeping for 1s, to wait for the FS to be flush."); std::thread::sleep(Duration::from_millis(1000)); // Drop the daemon to clean up. drop(daemon); } #[test] fn large_write<'a>() { init(); let dir = tempfile::Builder::new() .prefix("large_write") .tempdir() .unwrap(); let mnt = dir.into_path(); let mnt_str = String::from(mnt.to_str().unwrap()); let daemon = mount_tempdir_rw(mnt); info!("mounted fs at {} in thread {:#?}", mnt_str, daemon); let mut tmp_file = NamedTempFile::new_in(mnt_str).unwrap(); info!("Opened '{:#?}'", tmp_file.path()); let file = tmp_file.as_file_mut(); // Make lots of 0123456789 piles and test out our rounding code. let small_amt = 20; // This should trigger a flush, and then append. let medium_amt = 350 * 1024; // This will fill the buffer and flush. let round_up = 512 * 1024; // This will send several *without* buffering. let several_chunks = 2048 * 1024; // This will then send 256 KiB, leave some of left over for a final close. let plus_leftover = several_chunks + 384 * 1024; let small_write: Vec<u8> = (0..small_amt).map(|x| (48 + (x % 10)) as u8).collect(); let big_write: Vec<u8> = (small_amt..medium_amt) .map(|x| (48 + (x % 10)) as u8) .collect(); let round_up_write: Vec<u8> = (medium_amt..round_up) .map(|x| (48 + (x % 10)) as u8) .collect(); let chunk_write: Vec<u8> = (round_up..several_chunks) .map(|x| (48 + (x % 10)) as u8) .collect(); let final_write: Vec<u8> = (several_chunks..plus_leftover) .map(|x| (48 + (x % 10)) as u8) .collect(); let all_writes = vec![ small_write, big_write, round_up_write, chunk_write, final_write, ]; for write_test in all_writes { let write_result = file.write_all(&write_test); info!(" got back {:#?}", write_result); assert!(write_result.is_ok()); } // sync the file to see if we have any other errors. let sync_result = file.sync_all(); info!(" sync result => {:#?}", sync_result); assert!(sync_result.is_ok()); // drop the file to close and unlink it. drop(file); info!("Sleeping for 1s, to wait for the FS to be flush."); std::thread::sleep(Duration::from_millis(1000)); // Drop the daemon to clean up. drop(daemon); } #[test] fn mount_and_ls<'a>() { init(); // Mount the filesystem and run ls. info!("Running mount_and_ls"); let dir = tempfile::Builder::new() .prefix("mount_and_ls") .tempdir() .unwrap(); let mnt = dir.into_path(); let mnt_str = String::from(mnt.to_str().unwrap()); let fs = mount_tempdir_ro(mnt); info!("mounted fs at {} on thread {:#?}", mnt_str, fs); run_ls(&mnt_str); let subdir = format!("{}/{}", mnt_str, LANDSAT_SUBDIR); info!("now ls in the subdir {}", subdir); run_ls(&subdir); drop(fs); } #[test] #[ignore] // This test copies the entire 70MB file, so don't usually run it. fn mount_and_cp<'a>() { init(); let dir = tempfile::Builder::new() .prefix("mount_and_cp") .tempdir() .unwrap(); let output_dir = tempfile::Builder::new() .prefix("mount_and_cp_output") .tempdir() .unwrap(); let mnt = dir.into_path(); let mnt_str = String::from(mnt.to_str().unwrap()); let daemon = mount_tempdir_ro(mnt); info!("mounted fs at {} in thread {:#?}", mnt_str, daemon); let tif_file = LANDSAT_B7_TIF; let sub_dir = LANDSAT_SUBDIR; let full_path = format!("{}/{}/{}", mnt_str, sub_dir, tif_file); let dst_path = format!("{}/{}", output_dir.path().to_str().unwrap(), tif_file); let stat_time = std::time::Instant::now(); info!("Calling stat to trigger init"); run_stat(&full_path, &mnt_str); info!("stat completed in {:#?}", stat_time.elapsed()); let now = std::time::Instant::now(); run_cp(&full_path, &dst_path, &mnt_str); println!("cp took {:#?}", now.elapsed()); drop(daemon); } #[test] #[ignore] // This test also copies the whole 70MB file, but via dd. fn test_dd<'a>() { init(); let dir = tempfile::Builder::new() .prefix("test_dd") .tempdir() .unwrap(); let output_dir = tempfile::Builder::new() .prefix("test_dd_output") .tempdir() .unwrap(); let mnt = dir.into_path(); let mnt_str = String::from(mnt.to_str().unwrap()); let daemon = mount_tempdir_ro(mnt); info!("mounted fs at {} in thread {:#?}", mnt_str, daemon); let tif_file = LANDSAT_B7_TIF; let sub_dir = LANDSAT_SUBDIR; let full_path = format!("{}/{}/{}", mnt_str, sub_dir, tif_file); let dst_path = format!("{}/{}", output_dir.path().to_str().unwrap(), tif_file); let stat_time = std::time::Instant::now(); info!("Calling stat to trigger init"); run_stat(&full_path, &mnt_str); info!("stat completed in {:#?}", stat_time.elapsed()); let now = std::time::Instant::now(); run_dd(&full_path, &dst_path, 1 * 1024 * 1024, &mnt_str); println!("dd took {:#?}", now.elapsed()); drop(daemon); } }