netbench-orchestrator/src/ec2_utils/networking.rs (221 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::{
ec2_utils::{
launch_plan::NetworkingInfraDetail,
types::{Az, SubnetId, VpcId},
InfraDetail, PlacementGroup,
},
orchestrator::{OrchError, OrchResult, OrchestratorConfig, STATE},
};
use aws_sdk_ec2::types::{
Filter, IpPermission, IpRange, PlacementStrategy, ResourceType, TagSpecification,
UserIdGroupPair,
};
use std::collections::HashMap;
use tracing::info;
pub async fn set_routing_permissions(
ec2_client: &aws_sdk_ec2::Client,
infra: &InfraDetail,
) -> OrchResult<()> {
let security_group_id = &infra.security_group_id;
let sg_group = UserIdGroupPair::builder()
.set_group_id(Some(security_group_id.clone()))
.build();
// Egress
ec2_client
.authorize_security_group_egress()
.group_id(security_group_id.clone())
.ip_permissions(
// Authorize security group (all traffic within the same security group)
IpPermission::builder()
.from_port(-1)
.to_port(-1)
.ip_protocol("-1")
.user_id_group_pairs(sg_group.clone())
.build(),
)
.send()
.await
.map_err(|err| OrchError::Ec2 {
dbg: format!("Failed to set egress permissions: {err}"),
})?;
let ssh_ip_range = IpRange::builder().cidr_ip("0.0.0.0/0").build();
// TODO only specify the russula ports
let russula_ip_range = IpRange::builder().cidr_ip("0.0.0.0/0").build();
let public_host_ip_ranges: Vec<IpRange> = infra
.clients
.iter()
.chain(infra.servers.iter())
.map(|instance_detail| {
info!("{}", instance_detail);
IpRange::builder()
.cidr_ip(format!("{}/32", instance_detail.host_ips().public_ip()))
.build()
})
.collect();
// Ingress
ec2_client
.authorize_security_group_ingress()
.group_id(security_group_id.clone())
.ip_permissions(
// Authorize security group (all traffic within the same security group)
IpPermission::builder()
.from_port(-1)
.to_port(-1)
.ip_protocol("-1")
.user_id_group_pairs(sg_group)
.build(),
)
.ip_permissions(
// Authorize all host ips
IpPermission::builder()
.from_port(-1)
.to_port(-1)
.ip_protocol("-1")
.set_ip_ranges(Some(public_host_ip_ranges.clone()))
.build(),
)
.ip_permissions(
// Authorize port 22 (ssh)
IpPermission::builder()
.from_port(22)
.to_port(22)
.ip_protocol("tcp")
.ip_ranges(ssh_ip_range)
.build(),
)
.ip_permissions(
// Authorize russula ports (Coordinator <-> Workers)
IpPermission::builder()
.from_port(STATE.russula_port.into())
.to_port(STATE.russula_port.into())
.ip_protocol("tcp")
.ip_ranges(russula_ip_range)
.build(),
)
.send()
.await
.map_err(|err| OrchError::Ec2 {
dbg: format!("Failed to set ingress permissions: {err}"),
})?;
Ok(())
}
// Create one per VPC. There is 1 VPC per region.
pub async fn create_security_group(
ec2_client: &aws_sdk_ec2::Client,
vpc_id: &VpcId,
unique_id: &str,
) -> OrchResult<String> {
let security_group_id = {
let req = ec2_client
.create_security_group()
.group_name(STATE.security_group_name(unique_id))
.description("This is a security group for a single run of netbench.")
.vpc_id(vpc_id.as_string())
.tag_specifications(
TagSpecification::builder()
.resource_type(ResourceType::SecurityGroup)
.tags(
aws_sdk_ec2::types::Tag::builder()
.key("Name")
.value(STATE.security_group_name(unique_id))
.build(),
)
.build(),
)
.send()
.await
.map_err(|err| OrchError::Ec2 {
dbg: err.to_string(),
})?;
req.group_id()
.ok_or(OrchError::Ec2 {
dbg: "Failed to create security group".to_owned(),
})?
.into()
};
Ok(security_group_id)
}
pub async fn get_subnet_vpc_ids(
ec2_client: &aws_sdk_ec2::Client,
config: &OrchestratorConfig,
) -> OrchResult<(NetworkingInfraDetail, VpcId)> {
let subnets = ec2_client
.describe_subnets()
.filters(
Filter::builder()
.name(config.cdk_config.netbench_runner_subnet_tag_key())
.values(config.cdk_config.netbench_runner_subnet_tag_value())
.build(),
)
.send()
.await
.map_err(|e| OrchError::Ec2 {
dbg: format!("Couldn't describe subnets: {:#?}", e),
})?;
let subnets = subnets.subnets();
assert!(!subnets.is_empty(), "No subnets found");
tracing::debug!("{:?}", subnets);
let mut az_subnet_map = HashMap::new();
let mut vpc_id = None;
for subnet in subnets.iter() {
let az = Az::from(
subnet
.availability_zone()
.ok_or(OrchError::Ec2 {
dbg: "Couldn't find AZ".into(),
})?
.to_owned(),
);
let subnet_id = SubnetId::from(
subnet
.subnet_id()
.ok_or(OrchError::Ec2 {
dbg: "Couldn't find subnet".into(),
})?
.to_owned(),
);
let subnet_vpc_id = VpcId::from(
subnet
.vpc_id()
.ok_or(OrchError::Ec2 {
dbg: "Couldn't find vpc".into(),
})?
.to_owned(),
);
// all subnets should have the same VPC id
if let Some(ref vpc_id) = vpc_id {
assert_eq!(vpc_id, &subnet_vpc_id);
}
vpc_id = Some(subnet_vpc_id);
az_subnet_map.insert(az, subnet_id);
}
let vpc_id = vpc_id.expect("VPC id should be set at this point");
// Validate that we have a subnet for each AZ
for host_config in config.client_config.iter() {
let az = Az::from(host_config.az.clone());
if !az_subnet_map.contains_key(&az) {
return Err(OrchError::Ec2 {
dbg: "Subnet not found for Az: {az}".into(),
});
}
}
// Validate that we have a subnet for each AZ
for host_config in config.server_config.iter() {
let az = Az::from(host_config.az.clone());
if !az_subnet_map.contains_key(&az) {
return Err(OrchError::Ec2 {
dbg: "Subnet not found for Az: {az}".into(),
});
}
}
Ok((az_subnet_map, vpc_id))
}
pub async fn create_placement_group(
ec2_client: &aws_sdk_ec2::Client,
az: &Az,
unique_id: &str,
) -> OrchResult<PlacementGroup> {
let placement = ec2_client
.create_placement_group()
.group_name(format!("cluster-{}-{}", unique_id, az))
.strategy(PlacementStrategy::Cluster)
.send()
.await
.map_err(|err| OrchError::Ec2 {
dbg: format!("{}", err),
})?;
placement
.placement_group()
.ok_or(OrchError::Ec2 {
dbg: "Failed to retrieve placement_group".to_string(),
})
.cloned()
}