netbench-orchestrator/src/ssm_utils/coordination_utils.rs (220 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use crate::{ ec2_utils::{InfraDetail, PubIp}, orchestrator::OrchestratorConfig, russula::{ self, netbench::{client, server}, WorkflowBuilder, WorkflowState, }, ssm_utils, ssm_utils::NetbenchDriverType, OrchError, OrchResult, STATE, }; use aws_sdk_ssm::operation::send_command::SendCommandOutput; use core::time::Duration; use indicatif::{ProgressBar, ProgressStyle}; use std::{collections::BTreeSet, net::SocketAddr}; use tracing::{debug, info}; fn get_progress_bar(msg: String) -> ProgressBar { let bar = ProgressBar::new(0); let style = ProgressStyle::with_template("{spinner} [{elapsed_precise}] {msg}") .unwrap() .tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈ "); bar.set_style(style); bar.enable_steady_tick(Duration::from_secs(1)); bar.set_message(msg); bar } pub struct ServerNetbenchRussula { // used to poll the remote worker via ssm worker: SendCommandOutput, coord: russula::Workflow<server::CoordWorkflow>, driver_name: String, } impl ServerNetbenchRussula { pub async fn new( ssm_client: &aws_sdk_ssm::Client, infra: &InfraDetail, scenario: &OrchestratorConfig, driver: &NetbenchDriverType, ) -> OrchResult<Self> { debug!("starting server worker"); let instance_ids = infra.server_ids(); let worker = ssm_utils::server::run_russula_worker(ssm_client, instance_ids, driver, scenario) .await?; // wait for worker to start tokio::time::sleep(STATE.poll_delay_ssm).await; // server coord debug!("starting server coordinator"); let coord = server_coord(infra.public_server_ips()).await?; Ok(ServerNetbenchRussula { worker, coord, driver_name: driver.trim_driver_name(), }) } // Poll till netbench is running on the server hosts. pub async fn wait_netbench_running( &mut self, ssm_client: &aws_sdk_ssm::Client, ) -> OrchResult<()> { let msg = format!("{}: Waiting for server state Running.", self.driver_name); let bar = get_progress_bar(msg); let cmd_id = self.worker.command().unwrap().command_id().unwrap(); loop { let poll_worker = ssm_utils::poll_ssm_results("server", ssm_client, cmd_id).await?; let poll_coord_worker_running = self .coord .poll_state(WorkflowState::WorkerRunning) .await .map_err(|err| OrchError::Russula { dbg: err.to_string(), })?; debug!( "Server Russula!: poll worker_running. Coordinator: {:?} Worker {:?}", poll_coord_worker_running, poll_worker ); if poll_coord_worker_running.is_ready() { break; } tokio::time::sleep(STATE.poll_delay_ssm).await; } bar.finish(); Ok(()) } // Continue to poll the server worker and coordinator till it is done pub async fn wait_done(&mut self, ssm_client: &aws_sdk_ssm::Client) -> OrchResult<()> { let msg = format!("{}: Waiting for server state Done.", self.driver_name); let bar = get_progress_bar(msg); let cmd_id = self.worker.command().unwrap().command_id().unwrap(); loop { let poll_worker = ssm_utils::poll_ssm_results("server", ssm_client, cmd_id).await?; let poll_coord_done = self.coord .poll_state(WorkflowState::Done) .await .map_err(|err| OrchError::Russula { dbg: err.to_string(), })?; debug!( "Server Russula!: Coordinator: {:?} Worker {:?}", poll_coord_done, poll_worker ); // Since the workers are executed via SSM, there is a delay in detecting // when they finish. In practice it's not absolutely necessary to wait // for the workers to finish. // // ``` // // wait for both coordinator and workers to finish // poll_coord_done.is_ready() && poll_worker.is_ready() // ``` if poll_coord_done.is_ready() { break; } tokio::time::sleep(STATE.poll_delay_ssm).await; } bar.finish(); info!("Server Russula!: Successful"); Ok(()) } } pub struct ClientNetbenchRussula { // used to poll the remote worker via ssm worker: SendCommandOutput, coord: russula::Workflow<client::CoordWorkflow>, driver_name: String, } impl ClientNetbenchRussula { pub async fn new( ssm_client: &aws_sdk_ssm::Client, infra: &InfraDetail, scenario: &OrchestratorConfig, driver: &NetbenchDriverType, ) -> OrchResult<Self> { let instance_ids = infra.client_ids(); debug!("starting client worker"); let worker = ssm_utils::client::run_russula_worker( ssm_client, instance_ids, infra.private_server_ips(), driver, scenario, ) .await?; // wait for worker to start tokio::time::sleep(STATE.poll_delay_ssm).await; // client coord debug!("starting client coordinator"); let coord = client_coord(infra.public_client_ips()).await?; Ok(ClientNetbenchRussula { worker, coord, driver_name: driver.trim_driver_name(), }) } // Continue to poll the client worker and coordinator till it is done pub async fn wait_done(&mut self, ssm_client: &aws_sdk_ssm::Client) -> OrchResult<()> { let msg = format!("{}: Waiting for client state Done.", self.driver_name); let bar = get_progress_bar(msg); let cmd_id = self.worker.command().unwrap().command_id().unwrap(); loop { let poll_worker = ssm_utils::poll_ssm_results("client", ssm_client, cmd_id).await?; let poll_coord = self .coord .poll_state(WorkflowState::Done) .await .map_err(|err| OrchError::Russula { dbg: err.to_string(), })?; debug!( "Client Russula!: Coordinator: {:?} Worker {:?}", poll_coord, poll_worker ); // Since the workers are executed via SSM, there is a delay in detecting // when they finish. In practice it's not absolutely necessary to wait // for the workers to finish. // // ``` // // wait for both coordinator and workers to finish // poll_coord_done.is_ready() && poll_worker.is_ready() // ``` if poll_coord.is_ready() { break; } tokio::time::sleep(STATE.poll_delay_ssm).await; } bar.finish(); info!("Client Russula!: Successful"); Ok(()) } } async fn server_coord( server_ips: Vec<&PubIp>, ) -> OrchResult<russula::Workflow<server::CoordWorkflow>> { let server_addr: Vec<SocketAddr> = server_ips .iter() .map(|ip| SocketAddr::new(ip.0, STATE.russula_port)) .collect(); let server_coord = WorkflowBuilder::new( BTreeSet::from_iter(server_addr), server::CoordWorkflow::new(), STATE.poll_delay_russula, ); let mut server_coord = server_coord .build() .await .map_err(|err| OrchError::Russula { dbg: err.to_string(), })?; // Attempt to connect to the peer server_coord .run_till(WorkflowState::Ready) .await .map_err(|err| OrchError::Russula { dbg: err.to_string(), })?; info!("server coord Ready"); Ok(server_coord) } async fn client_coord( client_ips: Vec<&PubIp>, ) -> OrchResult<russula::Workflow<client::CoordWorkflow>> { let client_addr: Vec<SocketAddr> = client_ips .iter() .map(|ip| SocketAddr::new(ip.0, STATE.russula_port)) .collect(); let client_coord = WorkflowBuilder::new( BTreeSet::from_iter(client_addr), client::CoordWorkflow::new(), STATE.poll_delay_russula, ); let mut client_coord = client_coord .build() .await .map_err(|err| OrchError::Russula { dbg: err.to_string(), })?; // Attempt to connect to the peer client_coord .run_till(WorkflowState::Ready) .await .map_err(|err| OrchError::Russula { dbg: err.to_string(), })?; info!("client coord Ready"); Ok(client_coord) }