crates/core/src/file_group/mod.rs (244 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
//! This module is for File Group related models and APIs.
//!
//! A set of data/base files + set of log files, that make up a unit for all operations.
pub mod base_file;
pub mod builder;
pub mod file_slice;
pub mod log_file;
pub mod reader;
use crate::error::CoreError;
use crate::file_group::base_file::BaseFile;
use crate::file_group::log_file::LogFile;
use crate::Result;
use file_slice::FileSlice;
use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Formatter;
use std::hash::{Hash, Hasher};
use std::str::FromStr;
/// A [FileGroup] contains multiple [FileSlice]s within a partition,
/// and it can be uniquely identified by `file_id` across the table.
///
/// The [FileSlice]s are ordered by the commit timestamps that indicate the creation of the
/// [FileSlice].
#[derive(Clone, Debug)]
pub struct FileGroup {
pub file_id: String,
pub partition_path: String,
pub file_slices: BTreeMap<String, FileSlice>,
}
impl PartialEq for FileGroup {
fn eq(&self, other: &Self) -> bool {
self.file_id == other.file_id && self.partition_path == other.partition_path
}
}
impl Eq for FileGroup {}
impl Hash for FileGroup {
fn hash<H: Hasher>(&self, state: &mut H) {
self.file_id.hash(state);
self.partition_path.hash(state);
}
}
impl fmt::Display for FileGroup {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str(
format!(
"File Group: partition={}, id={}",
&self.partition_path, &self.file_id
)
.as_str(),
)
}
}
impl FileGroup {
/// Create a new [FileGroup] with the given `file_id` and `partition_path` with no [FileSlice]s.
pub fn new(file_id: String, partition_path: String) -> Self {
Self {
file_id,
partition_path,
file_slices: BTreeMap::new(),
}
}
/// Create a new [FileGroup] with a [BaseFile]'s file name.
pub fn new_with_base_file_name(file_name: &str, partition_path: &str) -> Result<Self> {
let base_file = BaseFile::from_str(file_name)?;
let file_id = base_file.file_id.clone();
let mut file_group = Self::new(file_id, partition_path.to_string());
file_group.add_base_file(base_file)?;
Ok(file_group)
}
pub fn merge(&mut self, other: &FileGroup) -> Result<()> {
if self != other {
return Err(CoreError::FileGroup(format!(
"Cannot merge different file groups: {self} and {other}",
)));
}
for (commit_timestamp, other_file_slice) in other.file_slices.iter() {
if let Some(existing_file_slice) = self.file_slices.get_mut(commit_timestamp) {
existing_file_slice.merge(other_file_slice)?;
} else {
self.file_slices
.insert(commit_timestamp.clone(), other_file_slice.clone());
}
}
Ok(())
}
/// Add a [BaseFile] based on the file name to the corresponding [FileSlice] in the [FileGroup].
pub fn add_base_file_from_name(&mut self, file_name: &str) -> Result<&Self> {
let base_file = BaseFile::from_str(file_name)?;
self.add_base_file(base_file)
}
/// Add a [BaseFile] to the corresponding [FileSlice] in the [FileGroup].
pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> {
let commit_timestamp = base_file.commit_timestamp.as_str();
if self.file_slices.contains_key(commit_timestamp) {
Err(CoreError::FileGroup(format!(
"Instant time {commit_timestamp} is already present in File Group {}",
self.file_id
)))
} else {
self.file_slices.insert(
commit_timestamp.to_owned(),
FileSlice::new(base_file, self.partition_path.clone()),
);
Ok(self)
}
}
/// Add multiple [BaseFile]s to the corresponding [FileSlice]s in the [FileGroup].
pub fn add_base_files<I>(&mut self, base_files: I) -> Result<&Self>
where
I: IntoIterator<Item = BaseFile>,
{
for base_file in base_files {
self.add_base_file(base_file)?;
}
Ok(self)
}
/// Add a [LogFile] based on the file name to the corresponding [FileSlice] in the [FileGroup].
pub fn add_log_file_from_name(&mut self, file_name: &str) -> Result<&Self> {
let log_file = LogFile::from_str(file_name)?;
self.add_log_file(log_file)
}
/// Add multiple [LogFile]s based on the file names to the corresponding [FileSlice]s in the
/// [FileGroup].
pub fn add_log_files_from_names<I, S>(&mut self, log_file_names: I) -> Result<&Self>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
for file_name in log_file_names {
self.add_log_file_from_name(file_name.as_ref())?;
}
Ok(self)
}
/// Add a [LogFile] to the corresponding [FileSlice] in the [FileGroup].
///
/// TODO: support adding log files to file group without base files.
pub fn add_log_file(&mut self, log_file: LogFile) -> Result<&Self> {
let commit_timestamp = log_file.base_commit_timestamp.as_str();
if let Some(file_slice) = self.file_slices.get_mut(commit_timestamp) {
file_slice.log_files.insert(log_file);
Ok(self)
} else {
Err(CoreError::FileGroup(format!(
"Instant time {commit_timestamp} not found in File Group {}",
self.file_id
)))
}
}
/// Add multiple [LogFile]s to the corresponding [FileSlice]s in the [FileGroup].
pub fn add_log_files<I>(&mut self, log_files: I) -> Result<&Self>
where
I: IntoIterator<Item = LogFile>,
{
for log_file in log_files {
self.add_log_file(log_file)?;
}
Ok(self)
}
/// Retrieves a reference to the closest [FileSlice] that was created on or before the given
/// `timestamp`.
pub fn get_file_slice_as_of(&self, timestamp: &str) -> Option<&FileSlice> {
let as_of = timestamp.to_string();
if let Some((_, file_slice)) = self.file_slices.range(..=as_of).next_back() {
Some(file_slice)
} else {
None
}
}
/// Retrieves a mutable reference to the closest [FileSlice] that was created on or before the
/// given `timestamp`.
pub fn get_file_slice_mut_as_of(&mut self, timestamp: &str) -> Option<&mut FileSlice> {
let as_of = timestamp.to_string();
if let Some((_, file_slice)) = self.file_slices.range_mut(..=as_of).next_back() {
Some(file_slice)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::table::partition::EMPTY_PARTITION_PATH;
#[test]
fn load_a_valid_file_group() {
let mut fg = FileGroup::new(
"5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(),
EMPTY_PARTITION_PATH.to_string(),
);
let _ = fg.add_base_file_from_name(
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
);
let _ = fg.add_base_file_from_name(
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet",
);
assert_eq!(fg.file_slices.len(), 2);
assert_eq!(fg.partition_path, EMPTY_PARTITION_PATH);
let commit_times: Vec<&str> = fg.file_slices.keys().map(|k| k.as_str()).collect();
assert_eq!(commit_times, vec!["20240402123035233", "20240402144910683"]);
assert_eq!(
fg.get_file_slice_as_of("20240402123035233")
.unwrap()
.base_file
.commit_timestamp,
"20240402123035233"
);
assert!(fg.get_file_slice_as_of("-1").is_none());
}
#[test]
fn add_base_file_with_same_commit_time_should_fail() {
let mut fg = FileGroup::new(
"5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(),
EMPTY_PARTITION_PATH.to_string(),
);
let res1 = fg.add_base_file_from_name(
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
);
assert!(res1.is_ok());
let res2 = fg.add_base_file_from_name(
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402144910683.parquet",
);
assert!(res2.is_err());
assert_eq!(res2.unwrap_err().to_string(), "File group error: Instant time 20240402144910683 is already present in File Group 5a226868-2934-4f84-a16f-55124630c68d-0");
}
#[test]
fn test_file_group_display() {
let file_group = FileGroup {
file_id: "group123".to_string(),
partition_path: "part/2023-01-01".to_string(),
file_slices: BTreeMap::new(),
};
let display_string = format!("{}", file_group);
assert_eq!(
display_string,
"File Group: partition=part/2023-01-01, id=group123"
);
let file_group_no_partition = FileGroup {
file_id: "group456".to_string(),
partition_path: EMPTY_PARTITION_PATH.to_string(),
file_slices: BTreeMap::new(),
};
let display_string_no_partition = format!("{}", file_group_no_partition);
assert_eq!(
display_string_no_partition,
"File Group: partition=, id=group456"
);
}
}