netbench-orchestrator/src/ec2_utils/instance.rs (17 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use crate::{ ec2_utils::{ launch_plan::LaunchPlan, types::{Az, EndpointType, HostIps, PrivIp, PubIp}, }, orchestrator::{HostConfig, OrchError, OrchResult, OrchestratorConfig, STATE}, }; use aws_sdk_ec2::types::{ BlockDeviceMapping, EbsBlockDevice, IamInstanceProfileSpecification, Instance, InstanceNetworkInterfaceSpecification, InstanceStateName, InstanceType, PlacementGroup, ResourceType, ShutdownBehavior, Tag, TagSpecification, }; use std::{collections::HashMap, net::IpAddr, str::FromStr, time::Duration}; use tracing::{debug, info}; pub async fn launch_instances( ec2_client: &aws_sdk_ec2::Client, launch_plan: &LaunchPlan<'_>, security_group_id: &str, unique_id: &str, host_config: &HostConfig, placement_map: &HashMap<Az, PlacementGroup>, endpoint_type: EndpointType, ) -> OrchResult<Instance> { let instance_type = InstanceType::from(host_config.instance_type().as_str()); let subnet_id = launch_plan .networking_detail .get(&host_config.az.clone().into()) .ok_or(OrchError::Ec2 { dbg: "Subnet not found".to_string(), })?; let placement = host_config.to_ec2_placement(placement_map)?; let launch_request = ec2_client .run_instances() .placement(placement) .set_key_name(STATE.ssh_key_name.map(|s| s.to_string())) .iam_instance_profile( IamInstanceProfileSpecification::builder() .arn(&launch_plan.instance_profile_arn) .build(), ) .instance_type(instance_type) .image_id(&launch_plan.ami_id) .instance_initiated_shutdown_behavior(ShutdownBehavior::Terminate) // give the instances human readable names. name is set via tags .tag_specifications( TagSpecification::builder() .resource_type(ResourceType::Instance) .tags( Tag::builder() .key("Name") .value(instance_name(unique_id, endpoint_type)) .build(), ) .build(), ) .block_device_mappings( BlockDeviceMapping::builder() .device_name("/dev/xvda") .ebs( EbsBlockDevice::builder() .delete_on_termination(true) .volume_size(50) .build(), ) .build(), ) .network_interfaces( InstanceNetworkInterfaceSpecification::builder() .associate_public_ip_address(true) .delete_on_termination(true) .device_index(0) .subnet_id(subnet_id.as_string()) .groups(security_group_id) .build(), ) .min_count(1_i32) .max_count(1_i32) .send() .await .map_err(|err| OrchError::Ec2 { dbg: format!("{:#?}", err), })?; let instance = launch_request.instances(); // Get the launched instance instance .first() .ok_or(OrchError::Ec2 { dbg: "Failed to launch instance".to_string(), }) .cloned() } fn instance_name(unique_id: &str, endpoint_type: EndpointType) -> String { format!("{}_{}", endpoint_type.as_str().to_lowercase(), unique_id) } // Wait for running state pub async fn poll_running( ec2_client: &aws_sdk_ec2::Client, instance: &Instance, launch_cnt: usize, endpoint_type: &EndpointType, ) -> OrchResult<HostIps> { let mut actual_instance_state = InstanceStateName::Pending; let mut host_ip = None; let mut attempt = 1; while actual_instance_state != InstanceStateName::Running { let instance_id = instance.instance_id().expect("describe_instances failed"); let result = ec2_client .describe_instances() .instance_ids(instance_id) .send() .await .map_err(|err| OrchError::Ec2 { dbg: err.to_string(), })?; let instance = result .reservations() .first() .and_then(|reservation| reservation.instances().first()) .expect("failed to get instance"); // Get public and private ips host_ip = instance .private_ip_address() .and_then(|ip| IpAddr::from_str(ip).ok()) .and_then(|private_ip| { instance .public_ip_address() .and_then(|ip| IpAddr::from_str(ip).ok()) .map(|public_ip| HostIps::new(PrivIp(private_ip), PubIp(public_ip))) }); // Get the current instance state actual_instance_state = instance .state() .and_then(|state| state.name()) .expect("Failed to get instance state") .clone(); debug!("poll attempt: {:?}", attempt); attempt += 1; info!( "{:?} {} state: {:?}", endpoint_type, launch_cnt, actual_instance_state ); tokio::time::sleep(Duration::from_secs(1)).await; } host_ip.ok_or(OrchError::Ec2 { dbg: "Failed to launch EC2 host".to_string(), }) } pub async fn get_instance_profile( iam_client: &aws_sdk_iam::Client, config: &OrchestratorConfig, ) -> OrchResult<String> { let instance_profile_arn = iam_client .get_instance_profile() .instance_profile_name(config.cdk_config.netbench_runner_instance_profile()) .send() .await .map_err(|err| OrchError::Iam { dbg: err.to_string(), })?; Ok(instance_profile_arn .instance_profile() .expect("instance_profile failed") .arn() .to_string()) } pub async fn get_latest_ami(ssm_client: &aws_sdk_ssm::Client) -> OrchResult<String> { let ami_id = ssm_client .get_parameter() .name(STATE.ami_name) .with_decryption(true) .send() .await .map_err(|err| OrchError::Ssm { dbg: err.to_string(), })? .parameter() .expect("expected ami value") .value() .expect("expected ami value") .into(); Ok(ami_id) }