modules/agent-framework/airavata-agent/kernel.py (100 lines of code) (raw):
import time
from jupyter_client import KernelManager
from flask import Flask, request, jsonify
import os
import json
import re
app = Flask(__name__)
km = None
kc = None
kernel_running = False
ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
@app.route('/start', methods=['GET'])
def start_kernel():
global km
global kc
global kernel_running
if kernel_running:
return "Kernel already running"
# Create a new kernel manager
km = KernelManager(kernel_name='python3')
km.start_kernel()
# Create a client to interact with the kernel
kc = km.client()
kc.start_channels()
# Ensure the client is connected before executing code
kc.wait_for_ready()
kernel_running = True
return "Kernel started"
def strip_ansi_codes(text):
return ansi_escape.sub('', text)
@app.route('/execute', methods=['POST'])
def execute():
global km
global kc
code = request.json.get('code', '')
if not code:
return jsonify({'error': 'No code provided'}), 400
kc.execute(code)
outputs = []
execution_noticed = False
while True:
try:
msg = kc.get_iopub_msg(timeout=5)
content = msg.get("content", {})
msg_type = msg.get("msg_type", "")
# When a message with the text stream comes and it's the result of our execution
if msg_type == "execute_input":
execution_noticed = True
# Handle stdout streams
if msg_type == "stream" and content.get("name") == "stdout":
outputs.append({
"output_type": "stream",
"name": "stdout",
"text": content.get("text", "")
})
# Handle stderr streams
if msg_type == "stream" and content.get("name") == "stderr":
outputs.append({
"output_type": "stream",
"name": "stderr",
"text": content.get("text", "")
})
# Handle display data (e.g. plots)
if msg_type == "display_data":
outputs.append({
"output_type": "display_data",
"data": content.get("data", {}),
"metadata": content.get("metadata", {})
})
# Handle execution results (e.g. return values)
if msg_type == "execute_result":
outputs.append({
"output_type": "execute_result",
"data": content.get("data", {}),
"metadata": content.get("metadata", {}),
"execution_count": content.get("execution_count", None)
})
# Handle errors
if msg_type == "error":
# Strip ANSI codes from traceback
clean_traceback = [strip_ansi_codes(line) for line in content.get("traceback", [])]
outputs.append({
"output_type": "error",
"ename": content.get("ename", ""),
"evalue": content.get("evalue", ""),
"traceback": clean_traceback
})
# Check for end of execution
if msg_type == "status" and content.get("execution_state") == "idle" and execution_noticed:
break
except KeyboardInterrupt:
return jsonify({'error': "Execution interrupted by user"}), 500
except Exception as e:
print(f"Error while getting Jupyter message: {str(e)}")
response = {
"outputs": outputs
}
return jsonify(response), 200
@app.route('/stop', methods=['GET'])
def stop():
global km
global kc
global kernel_running
if not kernel_running:
return "Kernel is not running to shut down"
kc.stop_channels()
km.shutdown_kernel()
kernel_running = False
return 'Kernel shutting down...'
if __name__ == '__main__':
app.run(port=15000)