in controller/src/main/java/org/apache/airavata/mft/controller/spawner/EC2AgentSpawner.java [91:297]
public void launch() {
launchFuture = executor.submit( () -> {
String region = storageWrapper.getS3().getRegion();
String imageId = getAmi(region); // Ubuntu base image
String keyNamePrefix = "mft-aws-agent-key-";
String secGroupName = "MFTAgentSecurityGroup";
String agentId = UUID.randomUUID().toString();
String systemUser = "ubuntu";
String mftKeyDir = System.getProperty("user.home") + File.separator + ".mft" + File.separator + "keys";
String accessKey = secretWrapper.getS3().getAccessKey();
String secretKey = secretWrapper.getS3().getSecretKey();
try {
BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey);
AmazonEC2 amazonEC2 = AmazonEC2ClientBuilder.standard().withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
"https://ec2." + region + ".amazonaws.com", region))
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
boolean preSavedImage = false;
DescribeImagesRequest describeImagesRequest = new DescribeImagesRequest().withFilters(
new ArrayList<>());
describeImagesRequest.getFilters().add(new Filter().withName("name").withValues("mft-agent"));
DescribeImagesResult describeImagesResult = amazonEC2.describeImages(
describeImagesRequest);
List<Image> images = describeImagesResult.getImages();
if (!images.isEmpty()) {
Image image = images.get(0);
imageId = image.getImageId();
preSavedImage = true;
logger.info("Using already created image {}", imageId);
}
DescribeSecurityGroupsRequest desSecGrp = new DescribeSecurityGroupsRequest();
DescribeSecurityGroupsResult describeSecurityGroupsResult = amazonEC2.describeSecurityGroups(desSecGrp);
List<SecurityGroup> securityGroups = describeSecurityGroupsResult.getSecurityGroups();
boolean hasMftSecGroup = securityGroups.stream().anyMatch(sg -> sg.getGroupName().equals(secGroupName));
if (!hasMftSecGroup) {
CreateSecurityGroupRequest csgr = new CreateSecurityGroupRequest();
csgr.withGroupName(secGroupName).withDescription("MFT Agent Security Group");
CreateSecurityGroupResult createSecurityGroupResult = amazonEC2.createSecurityGroup(csgr);
IpPermission ipPermission = new IpPermission();
IpRange ipRange1 = new IpRange().withCidrIp("0.0.0.0/0");
ipPermission.withIpv4Ranges(Collections.singletonList(ipRange1))
.withIpProtocol("tcp")
.withFromPort(22)
.withToPort(22);
AuthorizeSecurityGroupIngressRequest authorizeSecurityGroupIngressRequest =
new AuthorizeSecurityGroupIngressRequest();
authorizeSecurityGroupIngressRequest.withGroupName(secGroupName)
.withIpPermissions(ipPermission);
amazonEC2.authorizeSecurityGroupIngress(authorizeSecurityGroupIngressRequest);
}
List<String> localKeys = new ArrayList<>();
if (Files.isDirectory(Path.of(mftKeyDir))) {
Stream<Path> keyPaths = Files.list(Path.of(mftKeyDir));
keyPaths.forEach(p -> localKeys.add(p.toFile().getName()));
}
Optional<KeyPairInfo> availableKeyPair = Optional.empty();
if (!localKeys.isEmpty()) {
DescribeKeyPairsResult keyPairs = amazonEC2.describeKeyPairs();
availableKeyPair = keyPairs.getKeyPairs().stream()
.filter(kp -> localKeys.stream()
.anyMatch(lk -> lk.equals(kp.getKeyName()))).findFirst();
}
String keyName;
if (availableKeyPair.isEmpty()) {
logger.info("Creating Key pair");
keyName = keyNamePrefix + UUID.randomUUID().toString();
CreateKeyPairRequest createKeyPairRequest = new CreateKeyPairRequest();
createKeyPairRequest.withKeyName(keyName);
CreateKeyPairResult createKeyPairResult = amazonEC2.createKeyPair(createKeyPairRequest);
KeyPair keyPair = createKeyPairResult.getKeyPair();
String privateKey = keyPair.getKeyMaterial();
Files.createDirectories(Path.of(mftKeyDir));
Files.write(Path.of(mftKeyDir, keyName), privateKey.getBytes(StandardCharsets.UTF_8));
logger.info("Created key pair " + keyName);
} else {
keyName = availableKeyPair.get().getKeyName();
logger.info("Using existing key pair " + keyName);
}
RunInstancesRequest runInstancesRequest = new RunInstancesRequest();
runInstancesRequest.withImageId(imageId)
.withInstanceType(InstanceType.T1Micro) // TODO Externalize
.withMinCount(1)
.withMaxCount(1)
.withKeyName(keyName)
.withTagSpecifications(
new TagSpecification().withResourceType(ResourceType.Instance)
.withTags(new Tag().withKey("Type").withValue("MFT-Agent"),
new Tag().withKey("AgentId").withValue(agentId),
new Tag().withKey("Name").withValue("MFT-Agent")))
.withSecurityGroups(secGroupName);
logger.info("Launching the EC2 VM to start Agent {}", agentId);
RunInstancesResult result = amazonEC2.runInstances(runInstancesRequest);
instanceId = result.getReservation().getInstances().get(0).getInstanceId();
try {
DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest();
describeInstancesRequest.setInstanceIds(Collections.singletonList(instanceId));
InstanceState instanceState = null;
String publicIpAddress = null;
logger.info("Waiting until instance {} is ready", instanceId);
for (int i = 0; i < 30; i++) {
DescribeInstancesResult describeInstancesResult = amazonEC2.describeInstances(describeInstancesRequest);
Instance instance = describeInstancesResult.getReservations().get(0).getInstances().get(0);
instanceState = instance.getState();
publicIpAddress = instance.getPublicIpAddress();
logger.info("Instance state {}, public ip {}", instanceState.getName(), publicIpAddress);
if (instanceState.getName().equals("running") && publicIpAddress != null) {
break;
}
Thread.sleep(2000);
}
logger.info("Waiting 30 seconds until the ssh interface comes up in instance {}", instanceId);
Thread.sleep(30000);
if ("running".equals(instanceState.getName()) && publicIpAddress != null) {
sshProvider = new SSHProvider();
sshProvider.initConnection(publicIpAddress, 22,
Path.of(mftKeyDir, keyName).toAbsolutePath().toString(), systemUser);
logger.info("Created SSH Connection. Installing dependencies...");
if (! preSavedImage) {
int exeCode = sshProvider.runCommand("sudo apt update -y");
if (exeCode != 0)
throw new IOException("Failed to update apt for VM");
exeCode = sshProvider.runCommand("sudo apt install -y openjdk-11-jre-headless");
if (exeCode != 0)
throw new IOException("Failed to install jdk on new VM");
exeCode = sshProvider.runCommand("sudo apt install -y unzip");
if (exeCode != 0)
throw new IOException("Failed to install unzip on new VM");
exeCode = sshProvider.runCommand("wget https://github.com/apache/airavata-mft/releases/download/v0.0.1/MFT-Agent-0.01-bin.zip");
if (exeCode != 0)
throw new IOException("Failed to download mft distribution");
exeCode = sshProvider.runCommand("unzip MFT-Agent-0.01-bin.zip");
if (exeCode != 0)
throw new IOException("Failed to unzip mft distribution");
}
int exeCode = sshProvider.runCommand("sed -ir \"s/^[#]*\\s*agent.id=.*/agent.id=" + agentId + "/\" /home/ubuntu/MFT-Agent-0.01/conf/application.properties");
if (exeCode != 0)
throw new IOException("Failed to update agent id in config file");
portForwardLock = new CountDownLatch(1);
CountDownLatch portForwardPendingLock = sshProvider.createSshPortForward(8500, portForwardLock);
logger.info("Waiting until the port forward is setup");
portForwardPendingLock.await();
exeCode = sshProvider.runCommand("sh MFT-Agent-0.01/bin/agent-daemon.sh start");
if (exeCode != 0)
throw new IOException("Failed to start the MFT Agent");
// Waiting 10 seconds to start the Agent
Thread.sleep(10000);
} else {
logger.info("Instance {} was not setup properly", instanceId);
throw new Exception("Instance " + instanceId + " was not setup properly");
}
} catch (Exception e) {
logger.error("Failed preparing instance {}. Deleting the instance", instanceId);
TerminateInstancesRequest terminateInstancesRequest = new TerminateInstancesRequest();
terminateInstancesRequest.setInstanceIds(Collections.singleton(instanceId));
amazonEC2.terminateInstances(terminateInstancesRequest);
throw e;
}
return agentId;
} catch (Exception e) {
logger.error("Failed to spin up the EC2 Agent", e);
throw new RuntimeException("Failed to spin up the EC2 Agent", e);
}
});
}