chat.py (200 lines of code) (raw):
#!/usr/bin/env python3
"""
chat.py
A command-line client to interact with the ORC API endpoint in a continuous chat manner.
This script reads the API `URI` and `X_FUNCTIONS_KEY` from a `.env` file,
allows the user to have a continuous conversation with the orchestrator,
and handles special keyboard inputs to control the flow.
Usage:
python chat.py
Alternatively, after making the script executable:
./chat.py
Environment Variables:
- ORCHESTRATOR_ENDPOINT: The API endpoint URI.
- FUNCTION_KEY: The API access key.
- CALL_ORCHESTRATOR_ENDPOINT: Set to "True" to use the remote orchestrator.
Requirements:
- Python 3.x
- requests library (`pip install requests`)
- python-dotenv library (`pip install python-dotenv`)
Security Note:
Ensure that your `.env` file is not committed to version control systems
as it contains sensitive information. Add `.env` to your `.gitignore` file.
"""
import os
import sys
import json
import requests
from dotenv import load_dotenv
import logging
import logging.config
from orchestration import Orchestrator
import asyncio
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False, # Allow existing loggers to propagate
'formatters': {
'standard': {
'format': '%(asctime)s - %(levelname)s - %(name)s - %(message)s',
},
},
'handlers': {
'file_handler': {
'class': 'logging.FileHandler',
'filename': 'output.log',
'mode': 'a',
'formatter': 'standard',
'level': 'INFO',
},
'console_handler': {
'class': 'logging.StreamHandler',
'stream': sys.stdout,
'formatter': 'standard',
'level': 'ERROR',
},
},
'root': {
'handlers': ['file_handler', 'console_handler'],
'level': 'DEBUG',
},
'loggers': {
# Explicitly configure external loggers to propagate to root
'shared.util': {
'handlers': ['file_handler'], # Only file handler
'level': 'INFO',
'propagate': True,
},
# Add more external loggers here if needed
},
}
# Apply the logging configuration
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger(__name__) # Use a module-specific logger
def get_rest_api_config():
"""
Load environment variables from a `.env` file.
Exits the program if required environment variables are missing.
Returns:
tuple: Contains uri (str), x_functions_key (str)
"""
load_dotenv()
uri = os.getenv('ORCHESTRATOR_ENDPOINT')
x_functions_key = os.getenv('FUNCTION_KEY')
if not uri:
logger.error("ORCHESTRATOR_ENDPOINT not found in environment variables.")
sys.exit(1)
if not x_functions_key:
logger.error("FUNCTION_KEY not found in environment variables.")
sys.exit(1)
return uri, x_functions_key
def get_user_input():
"""
Prompt the user to input a question.
Returns:
str: The user's input question, or special commands like 'CTRL_D'.
"""
try:
question = input("You: ").strip()
if not question:
print("Error: Input cannot be empty.")
return None
return question
except EOFError:
# Ctrl+D pressed
return 'CTRL_D'
except KeyboardInterrupt:
print("\nOperation cancelled by user.")
sys.exit(0)
def send_question_to_python(question, conversation_id):
"""
Process the question using the orchestrator.
Args:
question (str): The user's question.
conversation_id (str): The conversation ID.
Returns:
dict: The response from the orchestrator.
"""
# Use default client principal information
client_principal = {
'id': '00000000-0000-0000-0000-000000000123',
'name': 'anonymous',
'group_names': ''
}
# Call orchestrator
if question:
try:
orchestrator = Orchestrator(conversation_id, client_principal)
result = asyncio.run(orchestrator.answer(question))
if not isinstance(result, dict):
logger.error("Expected result to be a dictionary.")
return {"error": "Invalid response format from orchestrator."}
return result
except Exception as e:
logger.exception(f"An error occurred while orchestrating the question: {e}")
return {"error": "An error occurred while processing your question."}
else:
logger.warning("No question provided to orchestrate.")
return {"error": "No question provided."}
def send_question_to_rest_api(uri, x_functions_key, question, conversation_id):
"""
Send the question to the orchestrator API and return the response.
Args:
uri (str): The API endpoint URI.
x_functions_key (str): The API access key.
question (str): The question to send.
conversation_id (str): The conversation ID.
Returns:
dict: The API response parsed as a JSON object.
"""
headers = {
'x-functions-key': x_functions_key,
'Content-Type': 'application/json'
}
body = {
'conversation_id': conversation_id,
'question': question
}
try:
response = requests.post(uri, headers=headers, json=body)
response.raise_for_status() # Raises HTTPError for bad responses
try:
response_data = response.json()
if not isinstance(response_data, dict):
logger.error("Response JSON is not a dictionary.")
return {"error": "Invalid response format from orchestrator API."}
return response_data
except json.JSONDecodeError:
logger.error("Response is not valid JSON.")
return {"error": "Response is not valid JSON."}
except requests.exceptions.RequestException as e:
logger.exception(f"HTTP Request failed: {e}")
return {"error": f"HTTP Request failed: {e}"}
def display_answer(answer):
"""
Display the assistant's answer, reasoning, and SQL query extracted from a JSON-formatted string or dictionary.
Args:
answer (dict): The assistant's answer in dictionary format.
"""
if not answer:
logger.warning("No answer provided.")
print("No answer provided.")
return
# ANSI escape sequences for colors
BLUE = '\033[94m'
GREY = '\033[90m'
RESET = '\033[0m'
try:
# Ensure the answer is a dictionary
if isinstance(answer, str):
answer = json.loads(answer)
if not isinstance(answer, dict):
logger.error("Parsed JSON is not a dictionary.")
print("Assistant: The provided answer is not in the expected JSON object format.")
return
# Extract keys with default messages if keys are missing
assistant_answer = answer.get("answer", "No answer provided.")
assistant_reasoning = answer.get("reasoning", "No reasoning provided.")
assistant_data_points = answer.get("data_points", "No data points provided.")
print(f"{BLUE}Answer: {assistant_answer}{RESET}")
print(f"{BLUE}Reasoning: {GREY}{assistant_reasoning}{RESET}")
print(f"{BLUE}Data Points: {GREY}{assistant_data_points}{RESET}")
except json.JSONDecodeError as e:
logger.error(f"JSON decoding failed: {e}")
print("Assistant: Unable to parse the answer due to invalid JSON format.")
except Exception as e:
logger.exception(f"An unexpected error occurred: {e}")
print("Assistant: An unexpected error occurred while processing the answer.")
def display_thoughts_and_data_points(response_data):
"""
Display the thoughts and data_points from the API response.
Args:
response_data (dict): The API response as a JSON object.
"""
thoughts = response_data.get('thoughts', '')
reasoning = response_data.get('reasoning', '')
data_points = response_data.get('data_points', '')
if thoughts or data_points or reasoning:
BRIGHT_CYAN = '\033[96m'
RESET = '\033[0m'
print(f"{BRIGHT_CYAN}\n--- Agent Group Chat from Last Response ---")
if data_points:
print("\nReasoning:")
print(reasoning)
if thoughts:
print("\nThoughts:")
print(thoughts)
if data_points:
print("\nData Points:")
print(data_points)
print("---------------------------------------------------\n")
print(f"{RESET}")
else:
logger.info("No thoughts or data points in the last response.")
def main():
"""
Main function to execute the script logic.
"""
conversation_id = ""
last_response_data = None
while True:
user_input = get_user_input()
if user_input == 'CTRL_D':
# Display thoughts and data_points from last_response_data
if last_response_data:
display_thoughts_and_data_points(last_response_data)
else:
print("No previous response to display thoughts and data points.")
continue
elif user_input is None:
continue
else:
use_rest_api = os.getenv('USE_REST_API', "False").lower() == "true"
if use_rest_api:
uri, x_functions_key = get_rest_api_config()
response_data = send_question_to_rest_api(
uri, x_functions_key, user_input, conversation_id)
else:
response_data = send_question_to_python(user_input, conversation_id)
if 'error' in response_data:
print(f"Error: {response_data['error']}")
logger.error(f"Error in response: {response_data['error']}")
continue
last_response_data = response_data
# Update conversation_id
if 'conversation_id' in response_data:
conversation_id = response_data['conversation_id']
else:
logger.warning("No conversation_id in response data.")
# Display only the answer
display_answer(response_data)
if __name__ == '__main__':
try:
main()
except Exception as e:
logger.exception(f"An unhandled exception occurred: {e}")
sys.exit(1)