src/expr/table_scan.rs (106 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use datafusion::common::TableReference; use datafusion::logical_expr::logical_plan::TableScan; use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use crate::expr::logical_node::LogicalNode; use crate::sql::logical::PyLogicalPlan; use crate::{common::df_schema::PyDFSchema, expr::PyExpr}; #[pyclass(name = "TableScan", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyTableScan { table_scan: TableScan, } impl PyTableScan { pub fn new(table_scan: TableScan) -> Self { Self { table_scan } } } impl From<PyTableScan> for TableScan { fn from(tbl_scan: PyTableScan) -> TableScan { tbl_scan.table_scan } } impl From<TableScan> for PyTableScan { fn from(table_scan: TableScan) -> PyTableScan { PyTableScan { table_scan } } } impl Display for PyTableScan { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!( f, "TableScan\nTable Name: {} Projections: {:?} Projected Schema: {:?} Filters: {:?}", &self.table_scan.table_name, &self.py_projections(), &self.py_schema(), &self.py_filters(), ) } } #[pymethods] impl PyTableScan { /// Retrieves the name of the table represented by this `TableScan` instance #[pyo3(name = "table_name")] fn py_table_name(&self) -> PyResult<String> { Ok(format!("{}", self.table_scan.table_name)) } #[pyo3(name = "fqn")] fn fqn(&self) -> PyResult<(Option<String>, Option<String>, String)> { let table_ref: TableReference = self.table_scan.table_name.clone(); Ok(match table_ref { TableReference::Bare { table } => (None, None, table.to_string()), TableReference::Partial { schema, table } => { (None, Some(schema.to_string()), table.to_string()) } TableReference::Full { catalog, schema, table, } => ( Some(catalog.to_string()), Some(schema.to_string()), table.to_string(), ), }) } /// The column indexes that should be. Note if this is empty then /// all columns should be read by the `TableProvider`. This function /// provides a Tuple of the (index, column_name) to make things simpler /// for the calling code since often times the name is preferred to /// the index which is a lower level abstraction. #[pyo3(name = "projection")] fn py_projections(&self) -> PyResult<Vec<(usize, String)>> { match &self.table_scan.projection { Some(indices) => { let schema = self.table_scan.source.schema(); Ok(indices .iter() .map(|i| (*i, schema.field(*i).name().to_string())) .collect()) } None => Ok(vec![]), } } /// Resulting schema from the `TableScan` operation #[pyo3(name = "schema")] fn py_schema(&self) -> PyResult<PyDFSchema> { Ok((*self.table_scan.projected_schema).clone().into()) } /// Certain `TableProvider` physical readers offer the capability to filter rows that /// are read at read time. These `filters` are contained here. #[pyo3(name = "filters")] fn py_filters(&self) -> PyResult<Vec<PyExpr>> { Ok(self .table_scan .filters .iter() .map(|expr| PyExpr::from(expr.clone())) .collect()) } /// Optional number of rows that should be read at read time by the `TableProvider` #[pyo3(name = "fetch")] fn py_fetch(&self) -> PyResult<Option<usize>> { Ok(self.table_scan.fetch) } fn __repr__(&self) -> PyResult<String> { Ok(format!("TableScan({})", self)) } } impl LogicalNode for PyTableScan { fn inputs(&self) -> Vec<PyLogicalPlan> { // table scans are leaf nodes and do not have inputs vec![] } fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { self.clone().into_bound_py_any(py) } }