in datafusion/datasource-parquet/src/opener.rs [88:320]
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let file_range = file_meta.range.clone();
let extensions = file_meta.extensions.clone();
let file_name = file_meta.location().to_string();
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);
let metadata_size_hint = file_meta.metadata_size_hint.or(self.metadata_size_hint);
let mut async_file_reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
self.partition_index,
file_meta,
metadata_size_hint,
&self.metrics,
)?;
let batch_size = self.batch_size;
let projected_schema =
SchemaRef::from(self.table_schema.project(&self.projection)?);
let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
let schema_adapter = self
.schema_adapter_factory
.create(projected_schema, Arc::clone(&self.table_schema));
let predicate = self.predicate.clone();
let table_schema = Arc::clone(&self.table_schema);
let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
let coerce_int96 = self.coerce_int96;
let enable_bloom_filter = self.enable_bloom_filter;
let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
let limit = self.limit;
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
.global_counter("num_predicate_creation_errors");
let enable_page_index = self.enable_page_index;
Ok(Box::pin(async move {
// Don't load the page index yet. Since it is not stored inline in
// the footer, loading the page index if it is not needed will do
// unecessary I/O. We decide later if it is needed to evaluate the
// pruning predicates. Thus default to not requesting if from the
// underlying reader.
let mut options = ArrowReaderOptions::new().with_page_index(false);
let mut metadata_timer = file_metrics.metadata_load_time.timer();
// Begin by loading the metadata from the underlying reader (note
// the returned metadata may actually include page indexes as some
// readers may return page indexes even when not requested -- for
// example when they are cached)
let mut reader_metadata =
ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone())
.await?;
// Note about schemas: we are actually dealing with **3 different schemas** here:
// - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc.
// - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to.
// - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains.
let mut physical_file_schema = Arc::clone(reader_metadata.schema());
// The schema loaded from the file may not be the same as the
// desired schema (for example if we want to instruct the parquet
// reader to read strings using Utf8View instead). Update if necessary
if let Some(merged) =
apply_file_schema_type_coercions(&table_schema, &physical_file_schema)
{
physical_file_schema = Arc::new(merged);
options = options.with_schema(Arc::clone(&physical_file_schema));
reader_metadata = ArrowReaderMetadata::try_new(
Arc::clone(reader_metadata.metadata()),
options.clone(),
)?;
}
if coerce_int96.is_some() {
if let Some(merged) = coerce_int96_to_resolution(
reader_metadata.parquet_schema(),
&physical_file_schema,
&(coerce_int96.unwrap()),
) {
physical_file_schema = Arc::new(merged);
options = options.with_schema(Arc::clone(&physical_file_schema));
reader_metadata = ArrowReaderMetadata::try_new(
Arc::clone(reader_metadata.metadata()),
options.clone(),
)?;
}
}
// Build predicates for this specific file
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
predicate.as_ref(),
&physical_file_schema,
&predicate_creation_errors,
);
// The page index is not stored inline in the parquet footer so the
// code above may not have read the page index structures yet. If we
// need them for reading and they aren't yet loaded, we need to load them now.
if should_enable_page_index(enable_page_index, &page_pruning_predicate) {
reader_metadata = load_page_index(
reader_metadata,
&mut async_file_reader,
// Since we're manually loading the page index the option here should not matter but we pass it in for consistency
options.with_page_index(true),
)
.await?;
}
metadata_timer.stop();
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
async_file_reader,
reader_metadata,
);
let (schema_mapping, adapted_projections) =
schema_adapter.map_schema(&physical_file_schema)?;
let mask = ProjectionMask::roots(
builder.parquet_schema(),
adapted_projections.iter().cloned(),
);
// Filter pushdown: evaluate predicates during scan
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
&predicate,
&physical_file_schema,
&table_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
&schema_adapter_factory,
);
match row_filter {
Ok(Some(filter)) => {
builder = builder.with_row_filter(filter);
}
Ok(None) => {}
Err(e) => {
debug!(
"Ignoring error building row filter for '{:?}': {}",
predicate, e
);
}
};
};
// Determine which row groups to actually read. The idea is to skip
// as many row groups as possible based on the metadata and query
let file_metadata = Arc::clone(builder.metadata());
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
let access_plan =
create_initial_plan(&file_name, extensions, rg_metadata.len())?;
let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
// if there is a range restricting what parts of the file to read
if let Some(range) = file_range.as_ref() {
row_groups.prune_by_range(rg_metadata, range);
}
// If there is a predicate that can be evaluated against the metadata
if let Some(predicate) = predicate.as_ref() {
if enable_row_group_stats_pruning {
row_groups.prune_by_statistics(
&physical_file_schema,
builder.parquet_schema(),
rg_metadata,
predicate,
&file_metrics,
);
}
if enable_bloom_filter && !row_groups.is_empty() {
row_groups
.prune_by_bloom_filters(
&physical_file_schema,
&mut builder,
predicate,
&file_metrics,
)
.await;
}
}
let mut access_plan = row_groups.build();
// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
if enable_page_index && !access_plan.is_empty() {
if let Some(p) = page_pruning_predicate {
access_plan = p.prune_plan_with_page_index(
access_plan,
&physical_file_schema,
builder.parquet_schema(),
file_metadata.as_ref(),
&file_metrics,
);
}
}
let row_group_indexes = access_plan.row_group_indexes();
if let Some(row_selection) =
access_plan.into_overall_row_selection(rg_metadata)?
{
builder = builder.with_row_selection(row_selection);
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
let stream = builder
.with_projection(mask)
.with_batch_size(batch_size)
.with_row_groups(row_group_indexes)
.build()?;
let adapted = stream
.map_err(|e| ArrowError::ExternalError(Box::new(e)))
.map(move |maybe_batch| {
maybe_batch
.and_then(|b| schema_mapping.map_batch(b).map_err(Into::into))
});
Ok(adapted.boxed())
}))
}