src/expr/statement.rs (370 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use datafusion::logical_expr::{
Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion,
TransactionEnd, TransactionIsolationLevel, TransactionStart,
};
use pyo3::{prelude::*, IntoPyObjectExt};
use crate::{common::data_type::PyDataType, sql::logical::PyLogicalPlan};
use super::{logical_node::LogicalNode, PyExpr};
#[pyclass(name = "TransactionStart", module = "datafusion.expr", subclass)]
#[derive(Clone)]
pub struct PyTransactionStart {
transaction_start: TransactionStart,
}
impl From<TransactionStart> for PyTransactionStart {
fn from(transaction_start: TransactionStart) -> PyTransactionStart {
PyTransactionStart { transaction_start }
}
}
impl TryFrom<PyTransactionStart> for TransactionStart {
type Error = PyErr;
fn try_from(py: PyTransactionStart) -> Result<Self, Self::Error> {
Ok(py.transaction_start)
}
}
impl LogicalNode for PyTransactionStart {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![]
}
fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
self.clone().into_bound_py_any(py)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[pyclass(eq, eq_int, name = "TransactionAccessMode", module = "datafusion.expr")]
pub enum PyTransactionAccessMode {
ReadOnly,
ReadWrite,
}
impl From<TransactionAccessMode> for PyTransactionAccessMode {
fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode {
match access_mode {
TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly,
TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite,
}
}
}
impl TryFrom<PyTransactionAccessMode> for TransactionAccessMode {
type Error = PyErr;
fn try_from(py: PyTransactionAccessMode) -> Result<Self, Self::Error> {
match py {
PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly),
PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[pyclass(
eq,
eq_int,
name = "TransactionIsolationLevel",
module = "datafusion.expr"
)]
pub enum PyTransactionIsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable,
Snapshot,
}
impl From<TransactionIsolationLevel> for PyTransactionIsolationLevel {
fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel {
match isolation_level {
TransactionIsolationLevel::ReadUncommitted => {
PyTransactionIsolationLevel::ReadUncommitted
}
TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted,
TransactionIsolationLevel::RepeatableRead => {
PyTransactionIsolationLevel::RepeatableRead
}
TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable,
TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot,
}
}
}
impl TryFrom<PyTransactionIsolationLevel> for TransactionIsolationLevel {
type Error = PyErr;
fn try_from(value: PyTransactionIsolationLevel) -> Result<Self, Self::Error> {
match value {
PyTransactionIsolationLevel::ReadUncommitted => {
Ok(TransactionIsolationLevel::ReadUncommitted)
}
PyTransactionIsolationLevel::ReadCommitted => {
Ok(TransactionIsolationLevel::ReadCommitted)
}
PyTransactionIsolationLevel::RepeatableRead => {
Ok(TransactionIsolationLevel::RepeatableRead)
}
PyTransactionIsolationLevel::Serializable => {
Ok(TransactionIsolationLevel::Serializable)
}
PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot),
}
}
}
#[pymethods]
impl PyTransactionStart {
#[new]
pub fn new(
access_mode: PyTransactionAccessMode,
isolation_level: PyTransactionIsolationLevel,
) -> PyResult<Self> {
let access_mode = access_mode.try_into()?;
let isolation_level = isolation_level.try_into()?;
Ok(PyTransactionStart {
transaction_start: TransactionStart {
access_mode,
isolation_level,
},
})
}
pub fn access_mode(&self) -> PyResult<PyTransactionAccessMode> {
Ok(self.transaction_start.access_mode.clone().into())
}
pub fn isolation_level(&self) -> PyResult<PyTransactionIsolationLevel> {
Ok(self.transaction_start.isolation_level.clone().into())
}
}
#[pyclass(name = "TransactionEnd", module = "datafusion.expr", subclass)]
#[derive(Clone)]
pub struct PyTransactionEnd {
transaction_end: TransactionEnd,
}
impl From<TransactionEnd> for PyTransactionEnd {
fn from(transaction_end: TransactionEnd) -> PyTransactionEnd {
PyTransactionEnd { transaction_end }
}
}
impl TryFrom<PyTransactionEnd> for TransactionEnd {
type Error = PyErr;
fn try_from(py: PyTransactionEnd) -> Result<Self, Self::Error> {
Ok(py.transaction_end)
}
}
impl LogicalNode for PyTransactionEnd {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![]
}
fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
self.clone().into_bound_py_any(py)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[pyclass(eq, eq_int, name = "TransactionConclusion", module = "datafusion.expr")]
pub enum PyTransactionConclusion {
Commit,
Rollback,
}
impl From<TransactionConclusion> for PyTransactionConclusion {
fn from(value: TransactionConclusion) -> Self {
match value {
TransactionConclusion::Commit => PyTransactionConclusion::Commit,
TransactionConclusion::Rollback => PyTransactionConclusion::Rollback,
}
}
}
impl TryFrom<PyTransactionConclusion> for TransactionConclusion {
type Error = PyErr;
fn try_from(value: PyTransactionConclusion) -> Result<Self, Self::Error> {
match value {
PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit),
PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback),
}
}
}
#[pymethods]
impl PyTransactionEnd {
#[new]
pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult<Self> {
let conclusion = conclusion.try_into()?;
Ok(PyTransactionEnd {
transaction_end: TransactionEnd { conclusion, chain },
})
}
pub fn conclusion(&self) -> PyResult<PyTransactionConclusion> {
Ok(self.transaction_end.conclusion.clone().into())
}
pub fn chain(&self) -> bool {
self.transaction_end.chain
}
}
#[pyclass(name = "SetVariable", module = "datafusion.expr", subclass)]
#[derive(Clone)]
pub struct PySetVariable {
set_variable: SetVariable,
}
impl From<SetVariable> for PySetVariable {
fn from(set_variable: SetVariable) -> PySetVariable {
PySetVariable { set_variable }
}
}
impl TryFrom<PySetVariable> for SetVariable {
type Error = PyErr;
fn try_from(py: PySetVariable) -> Result<Self, Self::Error> {
Ok(py.set_variable)
}
}
impl LogicalNode for PySetVariable {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![]
}
fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
self.clone().into_bound_py_any(py)
}
}
#[pymethods]
impl PySetVariable {
#[new]
pub fn new(variable: String, value: String) -> Self {
PySetVariable {
set_variable: SetVariable { variable, value },
}
}
pub fn variable(&self) -> String {
self.set_variable.variable.clone()
}
pub fn value(&self) -> String {
self.set_variable.value.clone()
}
}
#[pyclass(name = "Prepare", module = "datafusion.expr", subclass)]
#[derive(Clone)]
pub struct PyPrepare {
prepare: Prepare,
}
impl From<Prepare> for PyPrepare {
fn from(prepare: Prepare) -> PyPrepare {
PyPrepare { prepare }
}
}
impl TryFrom<PyPrepare> for Prepare {
type Error = PyErr;
fn try_from(py: PyPrepare) -> Result<Self, Self::Error> {
Ok(py.prepare)
}
}
impl LogicalNode for PyPrepare {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.prepare.input).clone())]
}
fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
self.clone().into_bound_py_any(py)
}
}
#[pymethods]
impl PyPrepare {
#[new]
pub fn new(name: String, data_types: Vec<PyDataType>, input: PyLogicalPlan) -> Self {
let input = input.plan().clone();
let data_types = data_types
.into_iter()
.map(|data_type| data_type.into())
.collect();
PyPrepare {
prepare: Prepare {
name,
data_types,
input,
},
}
}
pub fn name(&self) -> String {
self.prepare.name.clone()
}
pub fn data_types(&self) -> Vec<PyDataType> {
self.prepare
.data_types
.clone()
.into_iter()
.map(|t| t.into())
.collect()
}
pub fn input(&self) -> PyLogicalPlan {
PyLogicalPlan {
plan: self.prepare.input.clone(),
}
}
}
#[pyclass(name = "Execute", module = "datafusion.expr", subclass)]
#[derive(Clone)]
pub struct PyExecute {
execute: Execute,
}
impl From<Execute> for PyExecute {
fn from(execute: Execute) -> PyExecute {
PyExecute { execute }
}
}
impl TryFrom<PyExecute> for Execute {
type Error = PyErr;
fn try_from(py: PyExecute) -> Result<Self, Self::Error> {
Ok(py.execute)
}
}
impl LogicalNode for PyExecute {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![]
}
fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
self.clone().into_bound_py_any(py)
}
}
#[pymethods]
impl PyExecute {
#[new]
pub fn new(name: String, parameters: Vec<PyExpr>) -> Self {
let parameters = parameters
.into_iter()
.map(|parameter| parameter.into())
.collect();
PyExecute {
execute: Execute { name, parameters },
}
}
pub fn name(&self) -> String {
self.execute.name.clone()
}
pub fn parameters(&self) -> Vec<PyExpr> {
self.execute
.parameters
.clone()
.into_iter()
.map(|t| t.into())
.collect()
}
}
#[pyclass(name = "Deallocate", module = "datafusion.expr", subclass)]
#[derive(Clone)]
pub struct PyDeallocate {
deallocate: Deallocate,
}
impl From<Deallocate> for PyDeallocate {
fn from(deallocate: Deallocate) -> PyDeallocate {
PyDeallocate { deallocate }
}
}
impl TryFrom<PyDeallocate> for Deallocate {
type Error = PyErr;
fn try_from(py: PyDeallocate) -> Result<Self, Self::Error> {
Ok(py.deallocate)
}
}
impl LogicalNode for PyDeallocate {
fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![]
}
fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
self.clone().into_bound_py_any(py)
}
}
#[pymethods]
impl PyDeallocate {
#[new]
pub fn new(name: String) -> Self {
PyDeallocate {
deallocate: Deallocate { name },
}
}
pub fn name(&self) -> String {
self.deallocate.name.clone()
}
}