utils/alchemy_db_client.py (178 lines of code) (raw):
from typing import Any
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.exc import SQLAlchemyError
from urllib.parse import quote_plus # For URL encoding
# Define version constant
APP_VERSION = "0.1.0"
def get_db_schema(
db_type: str,
host: str,
port: int,
database: str,
username: str,
password: str,
table_names: str | None = None
) -> dict[str, Any] | None:
"""
Get database table structure information
:param db_type: Database type (mysql/oracle/sqlserver/hologres)
:param host: Host address
:param port: Port number
:param database: Database name
:param username: Username
:param password: Password
:param table_names: Tables to query, comma-separated string, if None, query all tables
:return: Dictionary containing all table structure information
"""
result: dict[str, Any] = {}
# Build connection URL
driver = {
'mysql': 'pymysql',
'oracle': 'cx_oracle',
'sqlserver': 'pymssql',
'hologres': 'psycopg2'
}.get(db_type.lower(), '')
encoded_username = quote_plus(username)
encoded_password = quote_plus(password)
# Handle Hologres type, use PostgreSQL connection method
actual_db_type = db_type.lower()
if actual_db_type == 'hologres':
actual_db_type = 'postgresql'
# Create database engine
connection_url = f'{actual_db_type}+{driver}://{encoded_username}:{encoded_password}@{host}:{port}/{database}'
# Add application_name parameter with version for PostgreSQL/Hologres
if db_type.lower() == 'hologres' or db_type.lower() == 'postgresql':
connection_url += f'?application_name=hologres_text2data_from_dify_v{APP_VERSION}'
engine = create_engine(connection_url)
inspector = inspect(engine)
# SQL statements for getting column comments
column_comment_sql = {
'mysql': f"SELECT COLUMN_COMMENT FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = '{database}' AND TABLE_NAME = :table_name AND COLUMN_NAME = :column_name",
'oracle': "SELECT COMMENTS FROM ALL_COL_COMMENTS WHERE TABLE_NAME = :table_name AND COLUMN_NAME = :column_name",
'sqlserver': "SELECT CAST(ep.value AS NVARCHAR(MAX)) FROM sys.columns c LEFT JOIN sys.extended_properties ep ON ep.major_id = c.object_id AND ep.minor_id = c.column_id WHERE OBJECT_NAME(c.object_id) = :table_name AND c.name = :column_name",
'hologres': """
SELECT
d.description AS column_comment
FROM
pg_class c
LEFT JOIN pg_namespace n ON c.relnamespace = n.oid
LEFT JOIN pg_attribute a ON c.oid = a.attrelid
LEFT JOIN pg_description d ON a.attrelid = d.objoid AND a.attnum = d.objsubid
WHERE
c.relkind IN ('r', 'f', 'v')
AND a.attnum > 0
AND NOT a.attisdropped
AND n.nspname = :schema_name
AND c.relname = :table_name
AND a.attname = :column_name
AND n.nspname = :schema_name;
"""
}.get(db_type.lower(), "")
try:
# Get all table names
all_tables = []
# For PostgreSQL/Hologres, consider schema structure
if db_type.lower() == 'hologres' or db_type.lower() == 'postgresql':
# Get all schemas
schemas = inspector.get_schema_names()
# Exclude system schemas
excluded_schemas = ['pg_catalog', 'information_schema']
# For Hologres, exclude additional system schemas
if db_type.lower() == 'hologres':
excluded_schemas.extend(['hologres', 'hologres_statistic', 'hologres_streaming_mv'])
schemas = [s for s in schemas if s not in excluded_schemas and not s.startswith('pg_')]
# Get tables for each schema
for schema in schemas:
# Get regular tables
schema_tables = inspector.get_table_names(schema=schema)
all_tables.extend([f"{schema}.{table}" for table in schema_tables])
# Get views
schema_views = inspector.get_view_names(schema=schema)
all_tables.extend([f"{schema}.{view}" for view in schema_views])
# For Hologres, get partition tables and foreign tables with additional SQL query
if db_type.lower() == 'hologres' or db_type.lower() == 'postgresql':
try:
with engine.connect() as conn:
# Query for partition parent tables and foreign tables
special_tables_sql = text("""
SELECT c.relname as table_name
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = :schema_name
AND c.relkind IN ('p', 'f') -- 'p' for partition parent tables, 'f' for foreign tables
AND NOT EXISTS (
SELECT 1 FROM pg_inherits i
WHERE i.inhrelid = c.oid
)
""")
# Modified: Use different variable name query_result instead of result
query_result = conn.execute(special_tables_sql, {"schema_name": schema})
# Fix: Access the first element of the tuple using index 0, or use keys() to get column names
special_tables = []
for row in query_result:
if hasattr(row, '_mapping'): # SQLAlchemy 1.4+
special_tables.append(row._mapping['table_name'])
elif hasattr(row, 'keys'): # Compatible with older versions
special_tables.append(row[row.keys().index('table_name')])
else: # Use index directly
special_tables.append(row[0])
all_tables.extend([f"{schema}.{table}" for table in special_tables])
except SQLAlchemyError as e:
print(f"Warning: failed to get special tables for schema {schema}: {e}")
else:
# Other database types remain unchanged
all_tables = inspector.get_table_names()
# If table_names is specified, filter table names
target_tables = all_tables
if table_names:
target_tables = [table.strip() for table in table_names.split(',')]
# Filter for tables that actually exist
target_tables = [table for table in target_tables if table in all_tables]
print(f"Retrieving table metadata for {len(target_tables)} tables...")
for table_name in target_tables:
# Handle table names that may include schema
schema_name = 'public' # Default schema
actual_table_name = table_name
if '.' in table_name and (db_type.lower() == 'hologres' or db_type.lower() == 'postgresql'):
schema_name, actual_table_name = table_name.split('.', 1)
# Get table comments
table_comment = ""
try:
table_comment = inspector.get_table_comment(actual_table_name, schema=schema_name).get("text") or ""
except SQLAlchemyError as e:
raise ValueError(f"Failed to retrieve table comments: {str(e)}")
table_info = {
'comment': table_comment,
'columns': []
}
for column in inspector.get_columns(actual_table_name, schema=schema_name):
# Get column comments
column_comment = ""
try:
with engine.connect() as conn:
stmt = text(column_comment_sql)
params = {
'table_name': actual_table_name,
'column_name': column['name']
}
# Add schema parameter for PostgreSQL/Hologres
if db_type.lower() == 'hologres' or db_type.lower() == 'postgresql':
params['schema_name'] = schema_name
column_comment = conn.execute(stmt, params).scalar() or ""
except SQLAlchemyError as e:
print(f"Warning: failed to get comment for {table_name}.{column['name']} - {e}")
column_comment = ""
table_info['columns'].append({
'name': column['name'],
'comment': column_comment,
'type': str(column['type'])
})
result[table_name] = table_info
return result
except SQLAlchemyError as e:
raise ValueError(f"Failed to retrieve database table metadata: {str(e)}")
finally:
engine.dispose()
def format_schema_dsl(schema: dict[str, Any], with_type: bool = True, with_comment: bool = False) -> str:
"""
Compress database table structure into DSL format
:param schema: Structure returned by get_db_schema
:param with_type: Whether to keep field types
:param with_comment: Whether to keep field comments
:return: Compressed DSL string
"""
type_aliases = {
'INTEGER': 'i', 'INT': 'i', 'BIGINT': 'i', 'SMALLINT': 'i', 'TINYINT': 'i',
'VARCHAR': 's', 'TEXT': 's', 'CHAR': 's',
'DATETIME': 'dt', 'TIMESTAMP': 'dt', 'DATE': 'dt',
'DECIMAL': 'f', 'NUMERIC': 'f', 'FLOAT': 'f', 'DOUBLE': 'f',
'BOOLEAN': 'b', 'BOOL': 'b',
'JSON': 'j'
}
lines = []
for table_name, table_data in schema.items():
column_parts = []
for col in table_data['columns']:
parts = [col['name']]
if with_type:
raw_type = col['type'].split('(')[0].upper()
col_type = type_aliases.get(raw_type, raw_type.lower())
parts.append(col_type)
if with_comment and col.get('comment'):
parts.append(f"# {col['comment']}")
column_parts.append(":".join(parts))
# Build table comment
if with_comment and table_data.get('comment'):
lines.append(f"# {table_data['comment']}")
lines.append(f"T:{table_name}({', '.join(column_parts)})")
return "\n".join(lines)
def execute_sql(
db_type: str,
host: str,
port: int,
database: str,
username: str,
password: str,
sql: str,
params: dict[str, Any] | None = None
) -> list[dict[str, Any]] | dict[str, Any] | None:
"""
Function to connect to different types of databases and execute SQL statements.
Parameters:
db_type: Database type, e.g., 'mysql', 'oracle', 'sqlserver', 'hologres'
host: Database host address
port: Database port number
database: Database name
username: Username
password: Password
sql: SQL statement to execute
params: SQL parameter dictionary (optional)
Returns:
If executing a query statement, returns a list where each element is a row dictionary;
If executing a non-query statement, returns a dictionary containing the number of affected rows, e.g., {"rowcount": 3}
"""
driver = {
'mysql': 'pymysql',
'oracle': 'cx_oracle',
'sqlserver': 'pymssql',
'hologres': 'psycopg2'
}.get(db_type.lower(), '')
encoded_username = quote_plus(username)
encoded_password = quote_plus(password)
# Handle Hologres type, use PostgreSQL connection method
actual_db_type = db_type.lower()
if actual_db_type == 'hologres':
actual_db_type = 'postgresql'
# Create database engine
connection_url = f'{actual_db_type}+{driver}://{encoded_username}:{encoded_password}@{host}:{port}/{database}'
# Add application_name parameter with version for PostgreSQL/Hologres
if db_type.lower() == 'hologres' or db_type.lower() == 'postgresql':
connection_url += f'?application_name=hologres_text2data_from_dify_v{APP_VERSION}'
engine = create_engine(connection_url)
try:
# Use begin() to ensure transaction auto-commit
with engine.begin() as conn:
stmt = text(sql)
result_proxy = conn.execute(stmt, params or {})
# If returning row data, it's a query statement
if result_proxy.returns_rows:
rows = result_proxy.fetchall()
keys = result_proxy.keys()
return [dict(zip(keys, row)) for row in rows]
else:
# Non-query statement returns the number of affected rows
return {"rowcount": result_proxy.rowcount}
except SQLAlchemyError as e:
raise ValueError(f"Database error: {str(e)}")
finally:
engine.dispose()