crates/core/src/file_group/mod.rs (244 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ //! This module is for File Group related models and APIs. //! //! A set of data/base files + set of log files, that make up a unit for all operations. pub mod base_file; pub mod builder; pub mod file_slice; pub mod log_file; pub mod reader; use crate::error::CoreError; use crate::file_group::base_file::BaseFile; use crate::file_group::log_file::LogFile; use crate::Result; use file_slice::FileSlice; use std::collections::BTreeMap; use std::fmt; use std::fmt::Formatter; use std::hash::{Hash, Hasher}; use std::str::FromStr; /// A [FileGroup] contains multiple [FileSlice]s within a partition, /// and it can be uniquely identified by `file_id` across the table. /// /// The [FileSlice]s are ordered by the commit timestamps that indicate the creation of the /// [FileSlice]. #[derive(Clone, Debug)] pub struct FileGroup { pub file_id: String, pub partition_path: String, pub file_slices: BTreeMap<String, FileSlice>, } impl PartialEq for FileGroup { fn eq(&self, other: &Self) -> bool { self.file_id == other.file_id && self.partition_path == other.partition_path } } impl Eq for FileGroup {} impl Hash for FileGroup { fn hash<H: Hasher>(&self, state: &mut H) { self.file_id.hash(state); self.partition_path.hash(state); } } impl fmt::Display for FileGroup { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.write_str( format!( "File Group: partition={}, id={}", &self.partition_path, &self.file_id ) .as_str(), ) } } impl FileGroup { /// Create a new [FileGroup] with the given `file_id` and `partition_path` with no [FileSlice]s. pub fn new(file_id: String, partition_path: String) -> Self { Self { file_id, partition_path, file_slices: BTreeMap::new(), } } /// Create a new [FileGroup] with a [BaseFile]'s file name. pub fn new_with_base_file_name(file_name: &str, partition_path: &str) -> Result<Self> { let base_file = BaseFile::from_str(file_name)?; let file_id = base_file.file_id.clone(); let mut file_group = Self::new(file_id, partition_path.to_string()); file_group.add_base_file(base_file)?; Ok(file_group) } pub fn merge(&mut self, other: &FileGroup) -> Result<()> { if self != other { return Err(CoreError::FileGroup(format!( "Cannot merge different file groups: {self} and {other}", ))); } for (commit_timestamp, other_file_slice) in other.file_slices.iter() { if let Some(existing_file_slice) = self.file_slices.get_mut(commit_timestamp) { existing_file_slice.merge(other_file_slice)?; } else { self.file_slices .insert(commit_timestamp.clone(), other_file_slice.clone()); } } Ok(()) } /// Add a [BaseFile] based on the file name to the corresponding [FileSlice] in the [FileGroup]. pub fn add_base_file_from_name(&mut self, file_name: &str) -> Result<&Self> { let base_file = BaseFile::from_str(file_name)?; self.add_base_file(base_file) } /// Add a [BaseFile] to the corresponding [FileSlice] in the [FileGroup]. pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> { let commit_timestamp = base_file.commit_timestamp.as_str(); if self.file_slices.contains_key(commit_timestamp) { Err(CoreError::FileGroup(format!( "Instant time {commit_timestamp} is already present in File Group {}", self.file_id ))) } else { self.file_slices.insert( commit_timestamp.to_owned(), FileSlice::new(base_file, self.partition_path.clone()), ); Ok(self) } } /// Add multiple [BaseFile]s to the corresponding [FileSlice]s in the [FileGroup]. pub fn add_base_files<I>(&mut self, base_files: I) -> Result<&Self> where I: IntoIterator<Item = BaseFile>, { for base_file in base_files { self.add_base_file(base_file)?; } Ok(self) } /// Add a [LogFile] based on the file name to the corresponding [FileSlice] in the [FileGroup]. pub fn add_log_file_from_name(&mut self, file_name: &str) -> Result<&Self> { let log_file = LogFile::from_str(file_name)?; self.add_log_file(log_file) } /// Add multiple [LogFile]s based on the file names to the corresponding [FileSlice]s in the /// [FileGroup]. pub fn add_log_files_from_names<I, S>(&mut self, log_file_names: I) -> Result<&Self> where I: IntoIterator<Item = S>, S: AsRef<str>, { for file_name in log_file_names { self.add_log_file_from_name(file_name.as_ref())?; } Ok(self) } /// Add a [LogFile] to the corresponding [FileSlice] in the [FileGroup]. /// /// TODO: support adding log files to file group without base files. pub fn add_log_file(&mut self, log_file: LogFile) -> Result<&Self> { let commit_timestamp = log_file.base_commit_timestamp.as_str(); if let Some(file_slice) = self.file_slices.get_mut(commit_timestamp) { file_slice.log_files.insert(log_file); Ok(self) } else { Err(CoreError::FileGroup(format!( "Instant time {commit_timestamp} not found in File Group {}", self.file_id ))) } } /// Add multiple [LogFile]s to the corresponding [FileSlice]s in the [FileGroup]. pub fn add_log_files<I>(&mut self, log_files: I) -> Result<&Self> where I: IntoIterator<Item = LogFile>, { for log_file in log_files { self.add_log_file(log_file)?; } Ok(self) } /// Retrieves a reference to the closest [FileSlice] that was created on or before the given /// `timestamp`. pub fn get_file_slice_as_of(&self, timestamp: &str) -> Option<&FileSlice> { let as_of = timestamp.to_string(); if let Some((_, file_slice)) = self.file_slices.range(..=as_of).next_back() { Some(file_slice) } else { None } } /// Retrieves a mutable reference to the closest [FileSlice] that was created on or before the /// given `timestamp`. pub fn get_file_slice_mut_as_of(&mut self, timestamp: &str) -> Option<&mut FileSlice> { let as_of = timestamp.to_string(); if let Some((_, file_slice)) = self.file_slices.range_mut(..=as_of).next_back() { Some(file_slice) } else { None } } } #[cfg(test)] mod tests { use super::*; use crate::table::partition::EMPTY_PARTITION_PATH; #[test] fn load_a_valid_file_group() { let mut fg = FileGroup::new( "5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), EMPTY_PARTITION_PATH.to_string(), ); let _ = fg.add_base_file_from_name( "5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet", ); let _ = fg.add_base_file_from_name( "5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet", ); assert_eq!(fg.file_slices.len(), 2); assert_eq!(fg.partition_path, EMPTY_PARTITION_PATH); let commit_times: Vec<&str> = fg.file_slices.keys().map(|k| k.as_str()).collect(); assert_eq!(commit_times, vec!["20240402123035233", "20240402144910683"]); assert_eq!( fg.get_file_slice_as_of("20240402123035233") .unwrap() .base_file .commit_timestamp, "20240402123035233" ); assert!(fg.get_file_slice_as_of("-1").is_none()); } #[test] fn add_base_file_with_same_commit_time_should_fail() { let mut fg = FileGroup::new( "5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), EMPTY_PARTITION_PATH.to_string(), ); let res1 = fg.add_base_file_from_name( "5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet", ); assert!(res1.is_ok()); let res2 = fg.add_base_file_from_name( "5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402144910683.parquet", ); assert!(res2.is_err()); assert_eq!(res2.unwrap_err().to_string(), "File group error: Instant time 20240402144910683 is already present in File Group 5a226868-2934-4f84-a16f-55124630c68d-0"); } #[test] fn test_file_group_display() { let file_group = FileGroup { file_id: "group123".to_string(), partition_path: "part/2023-01-01".to_string(), file_slices: BTreeMap::new(), }; let display_string = format!("{}", file_group); assert_eq!( display_string, "File Group: partition=part/2023-01-01, id=group123" ); let file_group_no_partition = FileGroup { file_id: "group456".to_string(), partition_path: EMPTY_PARTITION_PATH.to_string(), file_slices: BTreeMap::new(), }; let display_string_no_partition = format!("{}", file_group_no_partition); assert_eq!( display_string_no_partition, "File Group: partition=, id=group456" ); } }