stubs/sqlalchemy/engine/base.pyi (147 lines of code) (raw):

from typing import Any, Dict, Optional, Text, Union from sqlalchemy import log from sqlalchemy.engine.interfaces import Compiled from sqlalchemy.schema import DDLElement, DefaultGenerator from sqlalchemy.sql.expression import ClauseElement from sqlalchemy.sql.functions import FunctionElement from .interfaces import Connectable as Connectable from .interfaces import ExceptionContext as ExceptionContext from .result import ResultProxy class Transaction(object): connection: Any = ... is_active: bool = ... def __init__(self, connection, parent) -> None: ... def close(self): ... def rollback(self): ... def commit(self): ... def __enter__(self): ... def __exit__(self, type, value, traceback): ... class RootTransaction(Transaction): def __init__(self, connection) -> None: ... class NestedTransaction(Transaction): def __init__(self, connection, parent) -> None: ... class TwoPhaseTransaction(Transaction): xid: Any = ... def __init__(self, connection, xid) -> None: ... def prepare(self): ... class Connection(Connectable): schema_for_object: Any = ... engine: Any = ... dialect: Any = ... should_close_with_result: bool = ... dispatch: Any = ... def __init__( self, engine, connection: Optional[Any] = ..., close_with_result: bool = ..., _branch_from: Optional[Any] = ..., _execution_options: Optional[Any] = ..., _dispatch: Optional[Any] = ..., _has_events: Optional[Any] = ..., ) -> None: ... def __enter__(self): ... def __exit__(self, type, value, traceback): ... def execution_options(self, **opt): ... @property def closed(self) -> bool: ... @property def invalidated(self) -> bool: ... @property def connection(self): ... def get_isolation_level(self) -> str: ... @property def default_isolation_level(self) -> str: ... @property def info(self) -> Dict[Any, Any]: ... def connect(self): ... def contextual_connect(self, **kwargs): ... def invalidate(self, exception: Optional[Any] = ...) -> None: ... def detach(self) -> None: ... def begin(self) -> Transaction: ... def begin_nested(self) -> NestedTransaction: ... def begin_twophase(self, xid: Optional[Any] = ...) -> TwoPhaseTransaction: ... def recover_twophase(self): ... def rollback_prepared(self, xid, recover: bool = ...): ... def commit_prepared(self, xid, recover: bool = ...): ... def in_transaction(self) -> bool: ... def close(self) -> None: ... def scalar(self, object, *multiparams, **params): ... def execute(self, object, *multiparams, **params): ... def transaction(self, callable_, *args, **kwargs): ... def run_callable(self, callable_, *args, **kwargs): ... class ExceptionContextImpl(ExceptionContext): engine: Any = ... connection: Any = ... sqlalchemy_exception: Any = ... original_exception: Any = ... execution_context: Any = ... statement: Any = ... parameters: Any = ... is_disconnect: bool = ... invalidate_pool_on_disconnect: bool = ... def __init__( self, exception, sqlalchemy_exception, engine, connection, cursor, statement, parameters, context, is_disconnect, invalidate_pool_on_disconnect, ) -> None: ... class Engine(Connectable, log.Identified): schema_for_object: Any = ... pool: Any = ... url: Any = ... dialect: Any = ... logging_name: Any = ... echo: Any = ... engine: Any = ... def __init__( self, pool, dialect, url, logging_name: Optional[Any] = ..., echo: Optional[Any] = ..., proxy: Optional[Any] = ..., execution_options: Optional[Any] = ..., ) -> None: ... def update_execution_options(self, **opt): ... def execution_options(self, **opt): ... @property def name(self): ... @property def driver(self): ... def dispose(self) -> None: ... def begin(self, close_with_result: bool = ...): ... def transaction(self, callable_, *args, **kwargs): ... def run_callable(self, callable_, *args, **kwargs): ... def execute( self, object: Union[ Text, ClauseElement, FunctionElement, DDLElement, DefaultGenerator, Compiled ], *multiparams: Any, **params: Any ) -> ResultProxy: ... def scalar(self, statement, *multiparams, **params): ... def connect(self, **kwargs) -> Connection: ... def contextual_connect(self, close_with_result: bool = ..., **kwargs): ... def table_names( self, schema: Optional[Any] = ..., connection: Optional[Any] = ... ): ... def has_table(self, table_name, schema: Optional[Any] = ...): ... def raw_connection(self, _connection: Optional[Any] = ...): ... class OptionEngine(Engine): url: Any = ... dialect: Any = ... logging_name: Any = ... echo: Any = ... dispatch: Any = ... def __init__(self, proxied, execution_options) -> None: ... pool: Any = ...