src/catalog.rs (100 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::collections::HashSet;
use std::sync::Arc;
use pyo3::exceptions::PyKeyError;
use pyo3::prelude::*;
use crate::errors::DataFusionError;
use crate::utils::wait_for_future;
use datafusion::{
arrow::pyarrow::ToPyArrow,
catalog::{schema::SchemaProvider, CatalogProvider},
datasource::{TableProvider, TableType},
};
#[pyclass(name = "Catalog", module = "datafusion", subclass)]
pub(crate) struct PyCatalog {
catalog: Arc<dyn CatalogProvider>,
}
#[pyclass(name = "Database", module = "datafusion", subclass)]
pub(crate) struct PyDatabase {
database: Arc<dyn SchemaProvider>,
}
#[pyclass(name = "Table", module = "datafusion", subclass)]
pub struct PyTable {
table: Arc<dyn TableProvider>,
}
impl PyCatalog {
pub fn new(catalog: Arc<dyn CatalogProvider>) -> Self {
Self { catalog }
}
}
impl PyDatabase {
pub fn new(database: Arc<dyn SchemaProvider>) -> Self {
Self { database }
}
}
impl PyTable {
pub fn new(table: Arc<dyn TableProvider>) -> Self {
Self { table }
}
pub fn table(&self) -> Arc<dyn TableProvider> {
self.table.clone()
}
}
#[pymethods]
impl PyCatalog {
fn names(&self) -> Vec<String> {
self.catalog.schema_names()
}
#[pyo3(signature = (name="public"))]
fn database(&self, name: &str) -> PyResult<PyDatabase> {
match self.catalog.schema(name) {
Some(database) => Ok(PyDatabase::new(database)),
None => Err(PyKeyError::new_err(format!(
"Database with name {name} doesn't exist."
))),
}
}
fn __repr__(&self) -> PyResult<String> {
Ok(format!(
"Catalog(schema_names=[{}])",
self.names().join(";")
))
}
}
#[pymethods]
impl PyDatabase {
fn names(&self) -> HashSet<String> {
self.database.table_names().into_iter().collect()
}
fn table(&self, name: &str, py: Python) -> PyResult<PyTable> {
if let Some(table) = wait_for_future(py, self.database.table(name)) {
Ok(PyTable::new(table))
} else {
Err(DataFusionError::Common(format!("Table not found: {name}")).into())
}
}
fn __repr__(&self) -> PyResult<String> {
Ok(format!(
"Database(table_names=[{}])",
Vec::from_iter(self.names()).join(";")
))
}
// register_table
// deregister_table
}
#[pymethods]
impl PyTable {
/// Get a reference to the schema for this table
#[getter]
fn schema(&self, py: Python) -> PyResult<PyObject> {
self.table.schema().to_pyarrow(py)
}
/// Get the type of this table for metadata/catalog purposes.
#[getter]
fn kind(&self) -> &str {
match self.table.table_type() {
TableType::Base => "physical",
TableType::View => "view",
TableType::Temporary => "temporary",
}
}
fn __repr__(&self) -> PyResult<String> {
let kind = self.kind();
Ok(format!("Table(kind={kind})"))
}
// fn scan
// fn statistics
// fn has_exact_statistics
// fn supports_filter_pushdown
}