tools/hologres_text2data.py (78 lines of code) (raw):
from cmath import e
from collections.abc import Generator
from typing import Any
from dify_plugin import Tool
from dify_plugin.entities.tool import ToolInvokeMessage
from utils.alchemy_db_client import get_db_schema
from dify_plugin.entities.model.llm import LLMModelConfig
from dify_plugin.entities.tool import ToolInvokeMessage
from dify_plugin.entities.model.message import SystemPromptMessage, UserPromptMessage
from utils.prompt_loader import PromptLoader
from utils.alchemy_db_client import format_schema_dsl
class HologresText2dataTool(Tool):
def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage]:
model_info= tool_parameters.get('model')
meta_data = get_db_schema(
db_type=tool_parameters['db_type'],
host=tool_parameters['host'],
port=tool_parameters['port'],
database=tool_parameters['db_name'],
username=tool_parameters['username'],
password=tool_parameters['password'],
table_names=tool_parameters.get('table_names', None) # Use get method with default value None
)
with_comment = tool_parameters.get('with_comment', False)
dsl_text = format_schema_dsl(meta_data, with_type=True, with_comment=with_comment)
print(dsl_text)
# Initialize template loader
prompt_loader = PromptLoader()
# Build template context
context = {
'db_type': tool_parameters['db_type'].upper(),
'meta_data': dsl_text
}
# Load dynamic prompt
system_prompt = prompt_loader.get_prompt(
db_type=tool_parameters['db_type'],
context=context,
limit=tool_parameters.get( 'limit', 100 ),
user_custom_prompt=tool_parameters.get('custom_prompt', '')
)
print(f"System prompt:\n{system_prompt}")
response = self.session.model.llm.invoke(
model_config=LLMModelConfig(
provider=model_info.get('provider'),
model=model_info.get('model'),
mode=model_info.get('mode'),
completion_params=model_info.get('completion_params')
),
prompt_messages=[
SystemPromptMessage(content=system_prompt),
UserPromptMessage(
content=f"Database type: {tool_parameters['db_type']}\n"
f"User requirement: {tool_parameters['query']}"
)
],
stream=False
)
print(response)
excute_sql = response.message.content
if (isinstance(excute_sql, str)):
if (tool_parameters['result_format'] == 'json'):
yield self.create_json_message({
"excute_sql": excute_sql
})
else:
yield self.create_text_message(excute_sql)
else:
yield self.create_text_message("Generation failed, please check if the input parameters are correct")
def _extract_sql_from_text(self, text: str) -> str:
import re
"""Intelligently extract SQL content (compatible with cases with or without code block wrapping)"""
# Match cases wrapped in code blocks
code_block_pattern = r'(?s)```sql(.*?)```'
code_match = re.search(code_block_pattern, text)
if code_match:
return code_match.group(1).strip()
# Match pure SQL not wrapped
sql_pattern = r'(?si)^\s*((?:SELECT|INSERT|UPDATE|DELETE|WITH|CREATE|ALTER|DROP).+?)(;|$|\n\s*$)'
sql_match = re.search(sql_pattern, text, re.DOTALL)
if sql_match:
# Remove possible non-statement terminators at the end
sql = sql_match.group(1).rstrip(';').strip()
return f"{sql};" if sql_match.group(2) == ';' else sql
# Fallback: return SQL-like parts from the original text
clean_text = re.sub(r'[\n\r\t]+', ' ', text).strip()
return clean_text if any(kw in clean_text.upper() for kw in ['SELECT', 'FROM', 'WHERE']) else ""