in agora/cerebral_api/src/SqlDBHandler.py [0:0]
def execute_query_and_return_data(self, query: str) -> Union[str, List[Dict[str, Any]]]:
"""
Execute a SQL query and return serialized results.
Args:
query (str): The SQL query to execute
Returns:
Union[str, List[Dict[str, Any]]]: Query results or error message
"""
request_id = datetime.now().strftime('%Y%m%d%H%M%S')
try:
self.logger.info(f"[{request_id}] Executing query: {query}")
# Ensure connection is valid
if not self.connect():
return "Error: Could not establish database connection"
cursor = self.conn.cursor()
# Execute the query
cursor.execute(query)
# Get column names
columns = [column[0] for column in cursor.description]
# Fetch results
rows = cursor.fetchall()
if not rows:
self.logger.info(f"[{request_id}] Query returned no results")
return "No data found for the specified query"
# Convert results to list of dictionaries
results = []
for row in rows:
# Convert row values to Python types
processed_row = {}
for i, value in enumerate(row):
if isinstance(value, datetime):
processed_row[columns[i]] = value.isoformat()
elif isinstance(value, (int, float, str, bool)):
processed_row[columns[i]] = value
else:
processed_row[columns[i]] = str(value)
results.append(processed_row)
self.logger.info(f"[{request_id}] Successfully processed {len(results)} rows")
return results
except pyodbc.ProgrammingError as pe:
error_message = f"SQL syntax error: {str(pe)}"
self.logger.error(f"[{request_id}] {error_message}")
return f"Error: {error_message}"
except pyodbc.Error as e:
error_message = f"Database error: {str(e)}"
self.logger.error(f"[{request_id}] {error_message}")
return f"Error: {error_message}"
except Exception as e:
error_message = f"An unexpected error occurred: {str(e)}"
self.logger.error(f"[{request_id}] {error_message}")
return f"Error: {error_message}"
finally:
if 'cursor' in locals():
cursor.close()