crates/iceberg/src/scan/mod.rs (1,429 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Table scan api.
mod cache;
use cache::*;
mod context;
use context::*;
mod task;
use std::sync::Arc;
use arrow_array::RecordBatch;
use futures::channel::mpsc::{channel, Sender};
use futures::stream::BoxStream;
use futures::{SinkExt, StreamExt, TryStreamExt};
pub use task::*;
use crate::arrow::ArrowReaderBuilder;
use crate::delete_file_index::DeleteFileIndex;
use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::io::FileIO;
use crate::runtime::spawn;
use crate::spec::{DataContentType, SnapshotRef};
use crate::table::Table;
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind, Result};
/// A stream of arrow [`RecordBatch`]es.
pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
/// Builder to create table scan.
pub struct TableScanBuilder<'a> {
table: &'a Table,
// Defaults to none which means select all columns
column_names: Option<Vec<String>>,
snapshot_id: Option<i64>,
batch_size: Option<usize>,
case_sensitive: bool,
filter: Option<Predicate>,
concurrency_limit_data_files: usize,
concurrency_limit_manifest_entries: usize,
concurrency_limit_manifest_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
// TODO: defaults to false for now whilst delete file processing
// is still being worked on but will switch to a default of true
// once this work is complete
delete_file_processing_enabled: bool,
}
impl<'a> TableScanBuilder<'a> {
pub(crate) fn new(table: &'a Table) -> Self {
let num_cpus = available_parallelism().get();
Self {
table,
column_names: None,
snapshot_id: None,
batch_size: None,
case_sensitive: true,
filter: None,
concurrency_limit_data_files: num_cpus,
concurrency_limit_manifest_entries: num_cpus,
concurrency_limit_manifest_files: num_cpus,
row_group_filtering_enabled: true,
row_selection_enabled: false,
delete_file_processing_enabled: false,
}
}
/// Sets the desired size of batches in the response
/// to something other than the default
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
self.batch_size = batch_size;
self
}
/// Sets the scan's case sensitivity
pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self {
self.case_sensitive = case_sensitive;
self
}
/// Specifies a predicate to use as a filter
pub fn with_filter(mut self, predicate: Predicate) -> Self {
// calls rewrite_not to remove Not nodes, which must be absent
// when applying the manifest evaluator
self.filter = Some(predicate.rewrite_not());
self
}
/// Select all columns.
pub fn select_all(mut self) -> Self {
self.column_names = None;
self
}
/// Select empty columns.
pub fn select_empty(mut self) -> Self {
self.column_names = Some(vec![]);
self
}
/// Select some columns of the table.
pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
self.column_names = Some(
column_names
.into_iter()
.map(|item| item.to_string())
.collect(),
);
self
}
/// Set the snapshot to scan. When not set, it uses current snapshot.
pub fn snapshot_id(mut self, snapshot_id: i64) -> Self {
self.snapshot_id = Some(snapshot_id);
self
}
/// Sets the concurrency limit for both manifest files and manifest
/// entries for this scan
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_manifest_files = limit;
self.concurrency_limit_manifest_entries = limit;
self.concurrency_limit_data_files = limit;
self
}
/// Sets the data file concurrency limit for this scan
pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_data_files = limit;
self
}
/// Sets the manifest entry concurrency limit for this scan
pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_manifest_entries = limit;
self
}
/// Determines whether to enable row group filtering.
/// When enabled, if a read is performed with a filter predicate,
/// then the metadata for each row group in the parquet file is
/// evaluated against the filter predicate and row groups
/// that cant contain matching rows will be skipped entirely.
///
/// Defaults to enabled, as it generally improves performance or
/// keeps it the same, with performance degradation unlikely.
pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
self.row_group_filtering_enabled = row_group_filtering_enabled;
self
}
/// Determines whether to enable row selection.
/// When enabled, if a read is performed with a filter predicate,
/// then (for row groups that have not been skipped) the page index
/// for each row group in a parquet file is parsed and evaluated
/// against the filter predicate to determine if ranges of rows
/// within a row group can be skipped, based upon the page-level
/// statistics for each column.
///
/// Defaults to being disabled. Enabling requires parsing the parquet page
/// index, which can be slow enough that parsing the page index outweighs any
/// gains from the reduced number of rows that need scanning.
/// It is recommended to experiment with partitioning, sorting, row group size,
/// page size, and page row limit Iceberg settings on the table being scanned in
/// order to get the best performance from using row selection.
pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
self.row_selection_enabled = row_selection_enabled;
self
}
/// Determines whether to enable delete file processing (currently disabled by default)
///
/// When disabled, delete files are ignored.
pub fn with_delete_file_processing_enabled(
mut self,
delete_file_processing_enabled: bool,
) -> Self {
self.delete_file_processing_enabled = delete_file_processing_enabled;
self
}
/// Build the table scan.
pub fn build(self) -> Result<TableScan> {
let snapshot = match self.snapshot_id {
Some(snapshot_id) => self
.table
.metadata()
.snapshot_by_id(snapshot_id)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Snapshot with id {} not found", snapshot_id),
)
})?
.clone(),
None => {
let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
return Ok(TableScan {
batch_size: self.batch_size,
column_names: self.column_names,
file_io: self.table.file_io().clone(),
plan_context: None,
concurrency_limit_data_files: self.concurrency_limit_data_files,
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
delete_file_processing_enabled: self.delete_file_processing_enabled,
});
};
current_snapshot_id.clone()
}
};
let schema = snapshot.schema(self.table.metadata())?;
// Check that all column names exist in the schema.
if let Some(column_names) = self.column_names.as_ref() {
for column_name in column_names {
if schema.field_by_name(column_name).is_none() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Column {} not found in table. Schema: {}",
column_name, schema
),
));
}
}
}
let mut field_ids = vec![];
let column_names = self.column_names.clone().unwrap_or_else(|| {
schema
.as_struct()
.fields()
.iter()
.map(|f| f.name.clone())
.collect()
});
for column_name in column_names.iter() {
let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Column {} not found in table. Schema: {}",
column_name, schema
),
)
})?;
schema
.as_struct()
.field_by_id(field_id)
.ok_or_else(|| {
Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}",
column_name, schema
),
)
})?;
field_ids.push(field_id);
}
let snapshot_bound_predicate = if let Some(ref predicates) = self.filter {
Some(predicates.bind(schema.clone(), true)?)
} else {
None
};
let plan_context = PlanContext {
snapshot,
table_metadata: self.table.metadata_ref(),
snapshot_schema: schema,
case_sensitive: self.case_sensitive,
predicate: self.filter.map(Arc::new),
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
object_cache: self.table.object_cache(),
field_ids: Arc::new(field_ids),
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
};
Ok(TableScan {
batch_size: self.batch_size,
column_names: self.column_names,
file_io: self.table.file_io().clone(),
plan_context: Some(plan_context),
concurrency_limit_data_files: self.concurrency_limit_data_files,
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
delete_file_processing_enabled: self.delete_file_processing_enabled,
})
}
}
/// Table scan.
#[derive(Debug)]
pub struct TableScan {
/// A [PlanContext], if this table has at least one snapshot, otherwise None.
///
/// If this is None, then the scan contains no rows.
plan_context: Option<PlanContext>,
batch_size: Option<usize>,
file_io: FileIO,
column_names: Option<Vec<String>>,
/// The maximum number of manifest files that will be
/// retrieved from [`FileIO`] concurrently
concurrency_limit_manifest_files: usize,
/// The maximum number of [`ManifestEntry`]s that will
/// be processed in parallel
concurrency_limit_manifest_entries: usize,
/// The maximum number of [`ManifestEntry`]s that will
/// be processed in parallel
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
delete_file_processing_enabled: bool,
}
impl TableScan {
/// Returns a stream of [`FileScanTask`]s.
pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
let Some(plan_context) = self.plan_context.as_ref() else {
return Ok(Box::pin(futures::stream::empty()));
};
let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
// used to stream ManifestEntryContexts between stages of the file plan operation
let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) =
channel(concurrency_limit_manifest_files);
let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
channel(concurrency_limit_manifest_files);
// used to stream the results back to the caller
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
if self.delete_file_processing_enabled {
Some(DeleteFileIndex::new())
} else {
None
};
let manifest_list = plan_context.get_manifest_list().await?;
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
// whose partitions cannot match this
// scan's filter
let manifest_file_contexts = plan_context.build_manifest_file_contexts(
manifest_list,
manifest_entry_data_ctx_tx,
delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
(delete_file_idx.clone(), manifest_entry_delete_ctx_tx)
}),
)?;
let mut channel_for_manifest_error = file_scan_task_tx.clone();
// Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s
spawn(async move {
let result = futures::stream::iter(manifest_file_contexts)
.try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move {
ctx.fetch_manifest_and_stream_manifest_entries().await
})
.await;
if let Err(error) = result {
let _ = channel_for_manifest_error.send(Err(error)).await;
}
});
let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
if let Some((_, delete_file_tx)) = delete_file_idx_and_tx {
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
// Process the delete file [`ManifestEntry`] stream in parallel
spawn(async move {
let result = manifest_entry_delete_ctx_rx
.map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
.try_for_each_concurrent(
concurrency_limit_manifest_entries,
|(manifest_entry_context, tx)| async move {
spawn(async move {
Self::process_delete_manifest_entry(manifest_entry_context, tx)
.await
})
.await
},
)
.await;
if let Err(error) = result {
let _ = channel_for_delete_manifest_entry_error
.send(Err(error))
.await;
}
})
.await;
}
// Process the data file [`ManifestEntry`] stream in parallel
spawn(async move {
let result = manifest_entry_data_ctx_rx
.map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone())))
.try_for_each_concurrent(
concurrency_limit_manifest_entries,
|(manifest_entry_context, tx)| async move {
spawn(async move {
Self::process_data_manifest_entry(manifest_entry_context, tx).await
})
.await
},
)
.await;
if let Err(error) = result {
let _ = channel_for_data_manifest_entry_error.send(Err(error)).await;
}
});
Ok(file_scan_task_rx.boxed())
}
/// Returns an [`ArrowRecordBatchStream`].
pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
.with_data_file_concurrency_limit(self.concurrency_limit_data_files)
.with_row_group_filtering_enabled(self.row_group_filtering_enabled)
.with_row_selection_enabled(self.row_selection_enabled);
if let Some(batch_size) = self.batch_size {
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
}
arrow_reader_builder
.build()
.read(self.plan_files().await?)
.await
}
/// Returns a reference to the column names of the table scan.
pub fn column_names(&self) -> Option<&[String]> {
self.column_names.as_deref()
}
/// Returns a reference to the snapshot of the table scan.
pub fn snapshot(&self) -> Option<&SnapshotRef> {
self.plan_context.as_ref().map(|x| &x.snapshot)
}
async fn process_data_manifest_entry(
manifest_entry_context: ManifestEntryContext,
mut file_scan_task_tx: Sender<Result<FileScanTask>>,
) -> Result<()> {
// skip processing this manifest entry if it has been marked as deleted
if !manifest_entry_context.manifest_entry.is_alive() {
return Ok(());
}
// abort the plan if we encounter a manifest entry for a delete file
if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Encountered an entry for a delete file in a data file manifest",
));
}
if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
let BoundPredicates {
snapshot_bound_predicate,
partition_bound_predicate,
} = bound_predicates.as_ref();
let expression_evaluator_cache =
manifest_entry_context.expression_evaluator_cache.as_ref();
let expression_evaluator = expression_evaluator_cache.get(
manifest_entry_context.partition_spec_id,
partition_bound_predicate,
)?;
// skip any data file whose partition data indicates that it can't contain
// any data that matches this scan's filter
if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
return Ok(());
}
// skip any data file whose metrics don't match this scan's filter
if !InclusiveMetricsEvaluator::eval(
snapshot_bound_predicate,
manifest_entry_context.manifest_entry.data_file(),
false,
)? {
return Ok(());
}
}
// congratulations! the manifest entry has made its way through the
// entire plan without getting filtered out. Create a corresponding
// FileScanTask and push it to the result stream
file_scan_task_tx
.send(Ok(manifest_entry_context.into_file_scan_task().await?))
.await?;
Ok(())
}
async fn process_delete_manifest_entry(
manifest_entry_context: ManifestEntryContext,
mut delete_file_ctx_tx: Sender<DeleteFileContext>,
) -> Result<()> {
// skip processing this manifest entry if it has been marked as deleted
if !manifest_entry_context.manifest_entry.is_alive() {
return Ok(());
}
// abort the plan if we encounter a manifest entry that is not for a delete file
if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Encountered an entry for a data file in a delete manifest",
));
}
if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates {
let expression_evaluator_cache =
manifest_entry_context.expression_evaluator_cache.as_ref();
let expression_evaluator = expression_evaluator_cache.get(
manifest_entry_context.partition_spec_id,
&bound_predicates.partition_bound_predicate,
)?;
// skip any data file whose partition data indicates that it can't contain
// any data that matches this scan's filter
if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
return Ok(());
}
}
delete_file_ctx_tx
.send(DeleteFileContext {
manifest_entry: manifest_entry_context.manifest_entry.clone(),
partition_spec_id: manifest_entry_context.partition_spec_id,
})
.await?;
Ok(())
}
}
pub(crate) struct BoundPredicates {
partition_bound_predicate: BoundPredicate,
snapshot_bound_predicate: BoundPredicate,
}
#[cfg(test)]
pub mod tests {
//! shared tests for the table scan API
#![allow(missing_docs)]
use std::collections::HashMap;
use std::fs;
use std::fs::File;
use std::sync::Arc;
use arrow_array::{
ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
};
use futures::{stream, TryStreamExt};
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;
use tera::{Context, Tera};
use uuid::Uuid;
use crate::arrow::ArrowReaderBuilder;
use crate::expr::{BoundPredicate, Reference};
use crate::io::{FileIO, OutputFile};
use crate::scan::FileScanTask;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec,
PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
};
use crate::table::Table;
use crate::TableIdent;
pub struct TableTestFixture {
pub table_location: String,
pub table: Table,
}
impl TableTestFixture {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
let table_metadata1_location = table_location.join("metadata/v1.json");
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();
let table_metadata = {
let template_json_str = fs::read_to_string(format!(
"{}/testdata/example_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
))
.unwrap();
let mut context = Context::new();
context.insert("table_location", &table_location);
context.insert("manifest_list_1_location", &manifest_list1_location);
context.insert("manifest_list_2_location", &manifest_list2_location);
context.insert("table_metadata_1_location", &table_metadata1_location);
let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
};
let table = Table::builder()
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
.file_io(file_io.clone())
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
.build()
.unwrap();
Self {
table_location: table_location.to_str().unwrap().to_string(),
table,
}
}
#[allow(clippy::new_without_default)]
pub fn new_empty() -> Self {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
let table_metadata1_location = table_location.join("metadata/v1.json");
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();
let table_metadata = {
let template_json_str = fs::read_to_string(format!(
"{}/testdata/example_empty_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
))
.unwrap();
let mut context = Context::new();
context.insert("table_location", &table_location);
context.insert("table_metadata_1_location", &table_metadata1_location);
let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
};
let table = Table::builder()
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
.file_io(file_io.clone())
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
.build()
.unwrap();
Self {
table_location: table_location.to_str().unwrap().to_string(),
table,
}
}
pub fn new_unpartitioned() -> Self {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
let table_metadata1_location = table_location.join("metadata/v1.json");
let file_io = FileIO::from_path(table_location.to_str().unwrap())
.unwrap()
.build()
.unwrap();
let mut table_metadata = {
let template_json_str = fs::read_to_string(format!(
"{}/testdata/example_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
))
.unwrap();
let mut context = Context::new();
context.insert("table_location", &table_location);
context.insert("manifest_list_1_location", &manifest_list1_location);
context.insert("manifest_list_2_location", &manifest_list2_location);
context.insert("table_metadata_1_location", &table_metadata1_location);
let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
};
table_metadata.default_spec = Arc::new(PartitionSpec::unpartition_spec());
table_metadata.partition_specs.clear();
table_metadata.default_partition_type = StructType::new(vec![]);
table_metadata
.partition_specs
.insert(0, table_metadata.default_spec.clone());
let table = Table::builder()
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
.file_io(file_io.clone())
.metadata_location(table_metadata1_location.to_str().unwrap())
.build()
.unwrap();
Self {
table_location: table_location.to_str().unwrap().to_string(),
table,
}
}
fn next_manifest_file(&self) -> OutputFile {
self.table
.file_io()
.new_output(format!(
"{}/metadata/manifest_{}.avro",
self.table_location,
Uuid::new_v4()
))
.unwrap()
}
pub async fn setup_manifest_files(&mut self) {
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
let parent_snapshot = current_snapshot
.parent_snapshot(self.table.metadata())
.unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec();
// Write data files
let mut writer = ManifestWriterBuilder::new(
self.next_manifest_file(),
Some(current_snapshot.snapshot_id()),
vec![],
current_schema.clone(),
current_partition_spec.as_ref().clone(),
)
.build_v2_data();
writer
.add_entry(
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.key_metadata(None)
.build()
.unwrap(),
)
.build(),
)
.unwrap();
writer
.add_delete_entry(
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.build()
.unwrap(),
)
.build(),
)
.unwrap();
writer
.add_existing_entry(
ManifestEntry::builder()
.status(ManifestStatus::Existing)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap(),
)
.build(),
)
.unwrap();
let data_file_manifest = writer.write_manifest_file().await.unwrap();
// Write to manifest list
let mut manifest_list_write = ManifestListWriter::v2(
self.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
);
manifest_list_write
.add_manifests(vec![data_file_manifest].into_iter())
.unwrap();
manifest_list_write.close().await.unwrap();
// prepare data
let schema = {
let fields = vec![
arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string(),
)])),
arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)])),
arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)])),
arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)])),
arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)])),
];
Arc::new(arrow_schema::Schema::new(fields))
};
// x: [1, 1, 1, 1, ...]
let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
let mut values = vec![2; 512];
values.append(vec![3; 200].as_mut());
values.append(vec![4; 300].as_mut());
values.append(vec![5; 12].as_mut());
// y: [2, 2, 2, 2, ..., 3, 3, 3, 3, ..., 4, 4, 4, 4, ..., 5, 5, 5, 5]
let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![3; 512];
values.append(vec![4; 512].as_mut());
// z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
// a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", "Iceberg"]
let mut values = vec!["Apache"; 512];
values.append(vec!["Iceberg"; 512].as_mut());
let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
// dbl:
let mut values = vec![100.0f64; 512];
values.append(vec![150.0f64; 12].as_mut());
values.append(vec![200.0f64; 500].as_mut());
let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
// i32:
let mut values = vec![100i32; 512];
values.append(vec![150i32; 12].as_mut());
values.append(vec![200i32; 500].as_mut());
let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
// i64:
let mut values = vec![100i64; 512];
values.append(vec![150i64; 12].as_mut());
values.append(vec![200i64; 500].as_mut());
let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
// bool:
let mut values = vec![false; 512];
values.append(vec![true; 512].as_mut());
let values: BooleanArray = values.into();
let col8 = Arc::new(values) as ArrayRef;
let to_write = RecordBatch::try_new(schema.clone(), vec![
col1, col2, col3, col4, col5, col6, col7, col8,
])
.unwrap();
// Write the Parquet files
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
for n in 1..=3 {
let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
let mut writer =
ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
writer.write(&to_write).expect("Writing batch");
// writer must be closed to write footer
writer.close().unwrap();
}
}
pub async fn setup_unpartitioned_manifest_files(&mut self) {
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
let parent_snapshot = current_snapshot
.parent_snapshot(self.table.metadata())
.unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = Arc::new(PartitionSpec::unpartition_spec());
// Write data files using an empty partition for unpartitioned tables.
let mut writer = ManifestWriterBuilder::new(
self.next_manifest_file(),
Some(current_snapshot.snapshot_id()),
vec![],
current_schema.clone(),
current_partition_spec.as_ref().clone(),
)
.build_v2_data();
// Create an empty partition value.
let empty_partition = Struct::empty();
writer
.add_entry(
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(empty_partition.clone())
.key_metadata(None)
.build()
.unwrap(),
)
.build(),
)
.unwrap();
writer
.add_delete_entry(
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(empty_partition.clone())
.build()
.unwrap(),
)
.build(),
)
.unwrap();
writer
.add_existing_entry(
ManifestEntry::builder()
.status(ManifestStatus::Existing)
.snapshot_id(parent_snapshot.snapshot_id())
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(empty_partition.clone())
.build()
.unwrap(),
)
.build(),
)
.unwrap();
let data_file_manifest = writer.write_manifest_file().await.unwrap();
// Write to manifest list
let mut manifest_list_write = ManifestListWriter::v2(
self.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
);
manifest_list_write
.add_manifests(vec![data_file_manifest].into_iter())
.unwrap();
manifest_list_write.close().await.unwrap();
// prepare data for parquet files
let schema = {
let fields = vec![
arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string(),
)])),
arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"5".to_string(),
)])),
arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"6".to_string(),
)])),
arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"7".to_string(),
)])),
arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"8".to_string(),
)])),
];
Arc::new(arrow_schema::Schema::new(fields))
};
// Build the arrays for the RecordBatch
let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
let mut values = vec![2; 512];
values.append(vec![3; 200].as_mut());
values.append(vec![4; 300].as_mut());
values.append(vec![5; 12].as_mut());
let col2 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![3; 512];
values.append(vec![4; 512].as_mut());
let col3 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec!["Apache"; 512];
values.append(vec!["Iceberg"; 512].as_mut());
let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef;
let mut values = vec![100.0f64; 512];
values.append(vec![150.0f64; 12].as_mut());
values.append(vec![200.0f64; 500].as_mut());
let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![100i32; 512];
values.append(vec![150i32; 12].as_mut());
values.append(vec![200i32; 500].as_mut());
let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![100i64; 512];
values.append(vec![150i64; 12].as_mut());
values.append(vec![200i64; 500].as_mut());
let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
let mut values = vec![false; 512];
values.append(vec![true; 512].as_mut());
let values: BooleanArray = values.into();
let col8 = Arc::new(values) as ArrayRef;
let to_write = RecordBatch::try_new(schema.clone(), vec![
col1, col2, col3, col4, col5, col6, col7, col8,
])
.unwrap();
// Write the Parquet files
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
for n in 1..=3 {
let file = File::create(format!("{}/{}.parquet", &self.table_location, n)).unwrap();
let mut writer =
ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
writer.write(&to_write).expect("Writing batch");
// writer must be closed to write footer
writer.close().unwrap();
}
}
}
#[test]
fn test_table_scan_columns() {
let table = TableTestFixture::new().table;
let table_scan = table.scan().select(["x", "y"]).build().unwrap();
assert_eq!(
Some(vec!["x".to_string(), "y".to_string()]),
table_scan.column_names
);
let table_scan = table
.scan()
.select(["x", "y"])
.select(["z"])
.build()
.unwrap();
assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
}
#[test]
fn test_select_all() {
let table = TableTestFixture::new().table;
let table_scan = table.scan().select_all().build().unwrap();
assert!(table_scan.column_names.is_none());
}
#[test]
fn test_select_no_exist_column() {
let table = TableTestFixture::new().table;
let table_scan = table.scan().select(["x", "y", "z", "a", "b"]).build();
assert!(table_scan.is_err());
}
#[test]
fn test_table_scan_default_snapshot_id() {
let table = TableTestFixture::new().table;
let table_scan = table.scan().build().unwrap();
assert_eq!(
table.metadata().current_snapshot().unwrap().snapshot_id(),
table_scan.snapshot().unwrap().snapshot_id()
);
}
#[test]
fn test_table_scan_non_exist_snapshot_id() {
let table = TableTestFixture::new().table;
let table_scan = table.scan().snapshot_id(1024).build();
assert!(table_scan.is_err());
}
#[test]
fn test_table_scan_with_snapshot_id() {
let table = TableTestFixture::new().table;
let table_scan = table
.scan()
.snapshot_id(3051729675574597004)
.with_row_selection_enabled(true)
.build()
.unwrap();
assert_eq!(
table_scan.snapshot().unwrap().snapshot_id(),
3051729675574597004
);
}
#[tokio::test]
async fn test_plan_files_on_table_without_any_snapshots() {
let table = TableTestFixture::new_empty().table;
let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert!(batches.is_empty());
}
#[tokio::test]
async fn test_plan_files_no_deletions() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Create table scan for current snapshot and plan files
let table_scan = fixture
.table
.scan()
.with_row_selection_enabled(true)
.build()
.unwrap();
let mut tasks = table_scan
.plan_files()
.await
.unwrap()
.try_fold(vec![], |mut acc, task| async move {
acc.push(task);
Ok(acc)
})
.await
.unwrap();
assert_eq!(tasks.len(), 2);
tasks.sort_by_key(|t| t.data_file_path.to_string());
// Check first task is added data file
assert_eq!(
tasks[0].data_file_path,
format!("{}/1.parquet", &fixture.table_location)
);
// Check second task is existing data file
assert_eq!(
tasks[1].data_file_path,
format!("{}/3.parquet", &fixture.table_location)
);
}
#[tokio::test]
async fn test_open_parquet_no_deletions() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Create table scan for current snapshot and plan files
let table_scan = fixture
.table
.scan()
.with_row_selection_enabled(true)
.build()
.unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
let col = batches[0].column_by_name("x").unwrap();
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 1);
}
#[tokio::test]
async fn test_open_parquet_no_deletions_by_separate_reader() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Create table scan for current snapshot and plan files
let table_scan = fixture
.table
.scan()
.with_row_selection_enabled(true)
.build()
.unwrap();
let mut plan_task: Vec<_> = table_scan
.plan_files()
.await
.unwrap()
.try_collect()
.await
.unwrap();
assert_eq!(plan_task.len(), 2);
let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
let batch_stream = reader
.clone()
.read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
.await
.unwrap();
let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap();
let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build();
let batch_stream = reader
.read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))])))
.await
.unwrap();
let batch_2: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batch_1, batch_2);
}
#[tokio::test]
async fn test_open_parquet_with_projection() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Create table scan for current snapshot and plan files
let table_scan = fixture
.table
.scan()
.select(["x", "z"])
.with_row_selection_enabled(true)
.build()
.unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_columns(), 2);
let col1 = batches[0].column_by_name("x").unwrap();
let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 1);
let col2 = batches[0].column_by_name("z").unwrap();
let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 3);
// test empty scan
let table_scan = fixture.table.scan().select_empty().build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_columns(), 0);
assert_eq!(batches[0].num_rows(), 1024);
}
#[tokio::test]
async fn test_filter_on_arrow_lt() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: y < 3
let mut builder = fixture.table.scan();
let predicate = Reference::new("y").less_than(Datum::long(3));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 512);
let col = batches[0].column_by_name("x").unwrap();
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 1);
let col = batches[0].column_by_name("y").unwrap();
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 2);
}
#[tokio::test]
async fn test_filter_on_arrow_gt_eq() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: y >= 5
let mut builder = fixture.table.scan();
let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 12);
let col = batches[0].column_by_name("x").unwrap();
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 1);
let col = batches[0].column_by_name("y").unwrap();
let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 5);
}
#[tokio::test]
async fn test_filter_double_eq() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: dbl == 150.0
let mut builder = fixture.table.scan();
let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].num_rows(), 12);
let col = batches[0].column_by_name("dbl").unwrap();
let f64_arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
assert_eq!(f64_arr.value(1), 150.0f64);
}
#[tokio::test]
async fn test_filter_int_eq() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: i32 == 150
let mut builder = fixture.table.scan();
let predicate = Reference::new("i32").equal_to(Datum::int(150i32));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].num_rows(), 12);
let col = batches[0].column_by_name("i32").unwrap();
let i32_arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(i32_arr.value(1), 150i32);
}
#[tokio::test]
async fn test_filter_long_eq() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: i64 == 150
let mut builder = fixture.table.scan();
let predicate = Reference::new("i64").equal_to(Datum::long(150i64));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].num_rows(), 12);
let col = batches[0].column_by_name("i64").unwrap();
let i64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(i64_arr.value(1), 150i64);
}
#[tokio::test]
async fn test_filter_bool_eq() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: bool == true
let mut builder = fixture.table.scan();
let predicate = Reference::new("bool").equal_to(Datum::bool(true));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].num_rows(), 512);
let col = batches[0].column_by_name("bool").unwrap();
let bool_arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
assert!(bool_arr.value(1));
}
#[tokio::test]
async fn test_filter_on_arrow_is_null() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: y is null
let mut builder = fixture.table.scan();
let predicate = Reference::new("y").is_null();
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 0);
}
#[tokio::test]
async fn test_filter_on_arrow_is_not_null() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: y is not null
let mut builder = fixture.table.scan();
let predicate = Reference::new("y").is_not_null();
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 1024);
}
#[tokio::test]
async fn test_filter_on_arrow_lt_and_gt() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: y < 5 AND z >= 4
let mut builder = fixture.table.scan();
let predicate = Reference::new("y")
.less_than(Datum::long(5))
.and(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 500);
let col = batches[0].column_by_name("x").unwrap();
let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 500])) as ArrayRef;
assert_eq!(col, &expected_x);
let col = batches[0].column_by_name("y").unwrap();
let mut values = vec![];
values.append(vec![3; 200].as_mut());
values.append(vec![4; 300].as_mut());
let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
assert_eq!(col, &expected_y);
let col = batches[0].column_by_name("z").unwrap();
let expected_z = Arc::new(Int64Array::from_iter_values(vec![4; 500])) as ArrayRef;
assert_eq!(col, &expected_z);
}
#[tokio::test]
async fn test_filter_on_arrow_lt_or_gt() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: y < 5 AND z >= 4
let mut builder = fixture.table.scan();
let predicate = Reference::new("y")
.less_than(Datum::long(5))
.or(Reference::new("z").greater_than_or_equal_to(Datum::long(4)));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 1024);
let col = batches[0].column_by_name("x").unwrap();
let expected_x = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
assert_eq!(col, &expected_x);
let col = batches[0].column_by_name("y").unwrap();
let mut values = vec![2; 512];
values.append(vec![3; 200].as_mut());
values.append(vec![4; 300].as_mut());
values.append(vec![5; 12].as_mut());
let expected_y = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
assert_eq!(col, &expected_y);
let col = batches[0].column_by_name("z").unwrap();
let mut values = vec![3; 512];
values.append(vec![4; 512].as_mut());
let expected_z = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef;
assert_eq!(col, &expected_z);
}
#[tokio::test]
async fn test_filter_on_arrow_startswith() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: a STARTSWITH "Ice"
let mut builder = fixture.table.scan();
let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 512);
let col = batches[0].column_by_name("a").unwrap();
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_arr.value(0), "Iceberg");
}
#[tokio::test]
async fn test_filter_on_arrow_not_startswith() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: a NOT STARTSWITH "Ice"
let mut builder = fixture.table.scan();
let predicate = Reference::new("a").not_starts_with(Datum::string("Ice"));
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 512);
let col = batches[0].column_by_name("a").unwrap();
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_arr.value(0), "Apache");
}
#[tokio::test]
async fn test_filter_on_arrow_in() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: a IN ("Sioux", "Iceberg")
let mut builder = fixture.table.scan();
let predicate =
Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 512);
let col = batches[0].column_by_name("a").unwrap();
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_arr.value(0), "Iceberg");
}
#[tokio::test]
async fn test_filter_on_arrow_not_in() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;
// Filter: a NOT IN ("Sioux", "Iceberg")
let mut builder = fixture.table.scan();
let predicate =
Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]);
builder = builder
.with_filter(predicate)
.with_row_selection_enabled(true);
let table_scan = builder.build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
assert_eq!(batches[0].num_rows(), 512);
let col = batches[0].column_by_name("a").unwrap();
let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_arr.value(0), "Apache");
}
#[test]
fn test_file_scan_task_serialize_deserialize() {
let test_fn = |task: FileScanTask| {
let serialized = serde_json::to_string(&task).unwrap();
let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap();
assert_eq!(task.data_file_path, deserialized.data_file_path);
assert_eq!(task.start, deserialized.start);
assert_eq!(task.length, deserialized.length);
assert_eq!(task.project_field_ids, deserialized.project_field_ids);
assert_eq!(task.predicate, deserialized.predicate);
assert_eq!(task.schema, deserialized.schema);
};
// without predicate
let schema = Arc::new(
Schema::builder()
.with_fields(vec![Arc::new(NestedField::required(
1,
"x",
Type::Primitive(PrimitiveType::Binary),
))])
.build()
.unwrap(),
);
let task = FileScanTask {
data_file_path: "data_file_path".to_string(),
data_file_content: DataContentType::Data,
start: 0,
length: 100,
project_field_ids: vec![1, 2, 3],
predicate: None,
schema: schema.clone(),
record_count: Some(100),
data_file_format: DataFileFormat::Parquet,
deletes: vec![],
};
test_fn(task);
// with predicate
let task = FileScanTask {
data_file_path: "data_file_path".to_string(),
data_file_content: DataContentType::Data,
start: 0,
length: 100,
project_field_ids: vec![1, 2, 3],
predicate: Some(BoundPredicate::AlwaysTrue),
schema,
record_count: None,
data_file_format: DataFileFormat::Avro,
deletes: vec![],
};
test_fn(task);
}
}