in modules/agent-framework/airavata-agent/agent.go [23:382]
func main() {
args := os.Args[1:]
serverUrl := args[0]
agentId := args[1]
grpcStreamChannel := make(chan struct{})
kernelChannel := make(chan struct{})
conn, err := grpc.NewClient(serverUrl, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := protos.NewAgentCommunicationServiceClient(conn)
stream, err := c.CreateMessageBus(context.Background())
if err != nil {
log.Fatalf("Error creating stream: %v", err)
}
log.Printf("Trying to connect to %s with agent id %s", serverUrl, agentId)
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_AgentPing{AgentPing: &protos.AgentPing{AgentId: agentId}}}); err != nil {
log.Fatalf("Failed to connect to the server: %v", err)
} else {
log.Printf("Connected to the server...")
}
go func() {
log.Printf("Starting jupyter kernel")
cmd := exec.Command("micromamba", "run", "-n", "base", "python", "/opt/jupyter/kernel.py")
//cmd := exec.Command("jupyter/venv/bin/python", "jupyter/kernel.py")
stdout, err := cmd.StdoutPipe()
if err != nil {
fmt.Println("[agent.go] Error creating StdoutPipe:", err)
return
}
// Get stderr pipe
stderr, err := cmd.StderrPipe()
if err != nil {
fmt.Println("[agent.go] Error creating StderrPipe:", err)
return
}
log.Printf("[agent.go] Starting command for execution")
// Start the command
if err := cmd.Start(); err != nil {
fmt.Println("[agent.go] Error starting command:", err)
return
}
// Create channels to read from stdout and stderr
stdoutScanner := bufio.NewScanner(stdout)
stderrScanner := bufio.NewScanner(stderr)
// Stream stdout
go func() {
for stdoutScanner.Scan() {
fmt.Printf("[agent.go] stdout: %s\n", stdoutScanner.Text())
}
}()
// Stream stderr
go func() {
for stderrScanner.Scan() {
fmt.Printf("[agent.go] stderr: %s\n", stderrScanner.Text())
}
}()
// Wait for the command to finish
if err := cmd.Wait(); err != nil {
fmt.Println("[agent.go] Error waiting for command:", err)
return
}
fmt.Println("[agent.go] Command finished")
}()
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
close(grpcStreamChannel)
return
}
if err != nil {
log.Fatalf("[agent.go] Failed to receive a message : %v", err)
}
log.Printf("[agent.go] Received message %s", in.Message)
switch x := in.GetMessage().(type) {
case *protos.ServerMessage_PythonExecutionRequest:
log.Printf("[agent.go] Recived a python execution request")
executionId := x.PythonExecutionRequest.ExecutionId
sessionId := x.PythonExecutionRequest.SessionId
code := x.PythonExecutionRequest.Code
workingDir := x.PythonExecutionRequest.WorkingDir
libraries := x.PythonExecutionRequest.Libraries
log.Printf("[agent.go] Execution id %s", executionId)
log.Printf("[agent.go] Session id %s", sessionId)
log.Printf("[agent.go] Code %s", code)
log.Printf("[agent.go] Working Dir %s", workingDir)
log.Printf("[agent.go] Libraries %s", libraries)
go func() {
// setup the venv
venvCmd := fmt.Sprintf(`
agentId="%s"
pkgs="%s"
if [ ! -f "/tmp/$agentId/venv" ]; then
mkdir -p /tmp/$agentId
python3 -m venv /tmp/$agentId/venv
fi
source /tmp/$agentId/venv/bin/activate
python3 -m pip install $pkgs
`, agentId, strings.Join(libraries, " "))
log.Println("[agent.go] venv setup:", venvCmd)
venvExc := exec.Command("bash", "-c", venvCmd)
venvOut, venvErr := venvExc.CombinedOutput()
if venvErr != nil {
fmt.Println("[agent.go] venv setup: ERR", venvErr)
return
}
venvStdout := string(venvOut)
fmt.Println("[agent.go] venv setup:", venvStdout)
// execute the python code
pyCmd := fmt.Sprintf(`
workingDir="%s";
agentId="%s";
cd $workingDir;
source /tmp/$agentId/venv/bin/activate;
python3 <<EOF
%s
EOF`, workingDir, agentId, code)
log.Println("[agent.go] python code:", pyCmd)
pyExc := exec.Command("bash", "-c", pyCmd)
pyOut, pyErr := pyExc.CombinedOutput()
if pyErr != nil {
fmt.Println("[agent.go] python code: ERR", pyErr)
}
// send the result back to the server
pyStdout := string(pyOut)
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_PythonExecutionResponse{
PythonExecutionResponse: &protos.PythonExecutionResponse{
SessionId: sessionId,
ExecutionId: executionId,
ResponseString: pyStdout}}}); err != nil {
log.Printf("[agent.go] Failed to send execution result to server: %v", err)
} else {
log.Printf("[agent.go] Sent execution result to the server: %v", pyStdout)
}
}()
case *protos.ServerMessage_CommandExecutionRequest:
log.Printf("[agent.go] Recived a command execution request")
executionId := x.CommandExecutionRequest.ExecutionId
execArgs := x.CommandExecutionRequest.Arguments
log.Printf("[agent.go] Execution id %s", executionId)
cmd := exec.Command(execArgs[0], execArgs[1:]...)
log.Printf("[agent.go] Completed execution with the id %s", executionId)
output, err := cmd.CombinedOutput() // combined output of stdout and stderr
if err != nil {
log.Printf("[agent.go] command execution failed: %s", err)
}
outputString := string(output)
log.Printf("[agent.go] Execution output is %s", outputString)
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_CommandExecutionResponse{
CommandExecutionResponse: &protos.CommandExecutionResponse{ExecutionId: executionId, ResponseString: outputString}}}); err != nil {
log.Printf("[agent.go] Failed to send execution result to server: %v", err)
}
case *protos.ServerMessage_JupyterExecutionRequest:
log.Printf("[agent.go] Recived a jupyter execution request")
executionId := x.JupyterExecutionRequest.ExecutionId
sessionId := x.JupyterExecutionRequest.SessionId
code := x.JupyterExecutionRequest.Code
log.Printf("[agent.go] Execution ID: %s, Session ID: %s, Code: %s", executionId, sessionId, code)
url := "http://127.0.0.1:15000/start"
client := &http.Client{}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Printf("[agent.go] Failed to create the request start jupyter kernel: %v", err)
jupyterResponse := "Failed while running the cell in remote. Please retry"
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{
JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil {
log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err)
}
return
}
log.Printf("[agent.go] Sending the jupyter kernel start request to server...")
resp, err := client.Do(req)
if err != nil {
log.Printf("[agent.go] Failed to send the request start jupyter kernel: %v", err)
jupyterResponse := "Failed while running the cell in remote. Please retry"
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{
JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil {
log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err)
}
return
}
log.Printf("[agent.go] Successfully sent the jupyter kernel start request to server")
defer func() {
err := resp.Body.Close()
if err != nil {
log.Printf("[agent.go] Failed to close the response body for kernel start: %v", err)
jupyterResponse := "Failed while running the cell in remote. Please retry"
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{
JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil {
log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err)
}
return
}
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Printf("[agent.go] Failed to read response for start jupyter kernel: %v", err)
jupyterResponse := "Failed while running the cell in remote. Please retry"
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{
JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil {
log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err)
}
return
}
log.Printf("[agent.go] Starting to marshal execution request JSON data...")
url = "http://127.0.0.1:15000/execute"
data := map[string]string{
"code": code,
"executionId": executionId,
}
jsonData, err := json.Marshal(data)
if err != nil {
log.Fatalf("[agent.go] Failed to marshal JSON: %v", err)
jupyterResponse := "Failed while running the cell in remote. Please retry"
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{
JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil {
log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err)
}
return
}
log.Printf("[agent.go] Successful marshaling the JSON data")
req, err = http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
log.Printf("[agent.go] Failed to create the request run jupyter kernel: %v", err)
jupyterResponse := "Failed while running the cell in remote. Please retry"
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{
JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil {
log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err)
}
return
}
req.Header.Set("Content-Type", "application/json")
client = &http.Client{}
resp, err = client.Do(req)
if err != nil {
log.Printf("[agent.go] Failed to send the request run jupyter kernel: %v", err)
jupyterResponse := "Failed while running the cell in remote. Please retry"
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{
JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil {
log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err)
}
return
}
defer func() {
log.Printf("[agent.go] Closing the response...")
err := resp.Body.Close()
if err != nil {
log.Printf("[agent.go] Failed to close the response body for kernel execution: %v", err)
jupyterResponse := "Failed while running the cell in remote. Please retry"
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{
JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil {
log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err)
}
return
}
}()
log.Printf("[agent.go] Sending the jupyter execution " + executionId + "result to server...")
body, err = io.ReadAll(resp.Body)
if err != nil {
log.Printf("[agent.go] Failed to read response for run jupyter kernel: %v", err)
jupyterResponse := "Failed while running the cell in remote. Please retry"
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{
JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil {
log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err)
}
return
}
jupyterResponse := string(body)
log.Println("[agent.go] Jupyter execution " + executionId + "response: " + jupyterResponse)
if err := stream.Send(&protos.AgentMessage{Message: &protos.AgentMessage_JupyterExecutionResponse{
JupyterExecutionResponse: &protos.JupyterExecutionResponse{ExecutionId: executionId, ResponseString: jupyterResponse, SessionId: sessionId}}}); err != nil {
log.Printf("[agent.go] Failed to send jupyter execution result to server: %v", err)
}
case *protos.ServerMessage_TunnelCreationRequest:
log.Printf("[agent.go] Received a tunnel creation request")
host := x.TunnelCreationRequest.DestinationHost
destPort := x.TunnelCreationRequest.DestinationPort
srcPort := x.TunnelCreationRequest.SourcePort
keyPath := x.TunnelCreationRequest.SshKeyPath
sshUser := x.TunnelCreationRequest.SshUserName
log.Printf("[agent.go] Tunnel details - Host: %s, DestPort: %s, SrcPort: %s, KeyPath: %s, SSH User: %s", host, destPort, srcPort, keyPath, sshUser)
openRemoteTunnel(host, destPort, srcPort, sshUser, keyPath)
}
}
}()
<-grpcStreamChannel
<-kernelChannel
if err := stream.CloseSend(); err != nil {
log.Fatalf("failed to close the stream: %v", err)
}
}