public void launch()

in controller/src/main/java/org/apache/airavata/mft/controller/spawner/EC2AgentSpawner.java [91:297]


    public void launch() {

        launchFuture = executor.submit( () -> {
            String region = storageWrapper.getS3().getRegion();
            String imageId = getAmi(region); // Ubuntu base image
            String keyNamePrefix = "mft-aws-agent-key-";
            String secGroupName = "MFTAgentSecurityGroup";
            String agentId = UUID.randomUUID().toString();
            String systemUser = "ubuntu";

            String mftKeyDir = System.getProperty("user.home") + File.separator + ".mft" + File.separator + "keys";
            String accessKey = secretWrapper.getS3().getAccessKey();
            String secretKey = secretWrapper.getS3().getSecretKey();

            try {
                BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey);

                AmazonEC2 amazonEC2 = AmazonEC2ClientBuilder.standard().withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
                                "https://ec2." + region + ".amazonaws.com", region))
                        .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
                        .build();

                boolean preSavedImage = false;
                DescribeImagesRequest describeImagesRequest = new DescribeImagesRequest().withFilters(
                    new ArrayList<>());
                describeImagesRequest.getFilters().add(new Filter().withName("name").withValues("mft-agent"));
                DescribeImagesResult describeImagesResult = amazonEC2.describeImages(
                    describeImagesRequest);
                List<Image> images = describeImagesResult.getImages();
                if (!images.isEmpty()) {
                    Image image = images.get(0);
                    imageId = image.getImageId();
                    preSavedImage = true;
                    logger.info("Using already created image {}", imageId);
                }

                DescribeSecurityGroupsRequest desSecGrp = new DescribeSecurityGroupsRequest();
                DescribeSecurityGroupsResult describeSecurityGroupsResult = amazonEC2.describeSecurityGroups(desSecGrp);
                List<SecurityGroup> securityGroups = describeSecurityGroupsResult.getSecurityGroups();
                boolean hasMftSecGroup = securityGroups.stream().anyMatch(sg -> sg.getGroupName().equals(secGroupName));

                if (!hasMftSecGroup) {
                    CreateSecurityGroupRequest csgr = new CreateSecurityGroupRequest();
                    csgr.withGroupName(secGroupName).withDescription("MFT Agent Security Group");

                    CreateSecurityGroupResult createSecurityGroupResult = amazonEC2.createSecurityGroup(csgr);

                    IpPermission ipPermission = new IpPermission();

                    IpRange ipRange1 = new IpRange().withCidrIp("0.0.0.0/0");

                    ipPermission.withIpv4Ranges(Collections.singletonList(ipRange1))
                            .withIpProtocol("tcp")
                            .withFromPort(22)
                            .withToPort(22);

                    AuthorizeSecurityGroupIngressRequest authorizeSecurityGroupIngressRequest =
                            new AuthorizeSecurityGroupIngressRequest();
                    authorizeSecurityGroupIngressRequest.withGroupName(secGroupName)
                            .withIpPermissions(ipPermission);
                    amazonEC2.authorizeSecurityGroupIngress(authorizeSecurityGroupIngressRequest);
                }

                List<String> localKeys = new ArrayList<>();
                if (Files.isDirectory(Path.of(mftKeyDir))) {
                    Stream<Path> keyPaths = Files.list(Path.of(mftKeyDir));
                    keyPaths.forEach(p -> localKeys.add(p.toFile().getName()));
                }

                Optional<KeyPairInfo> availableKeyPair = Optional.empty();
                if (!localKeys.isEmpty()) {

                    DescribeKeyPairsResult keyPairs = amazonEC2.describeKeyPairs();
                    availableKeyPair = keyPairs.getKeyPairs().stream()
                            .filter(kp -> localKeys.stream()
                                    .anyMatch(lk -> lk.equals(kp.getKeyName()))).findFirst();
                }

                String keyName;

                if (availableKeyPair.isEmpty()) {
                    logger.info("Creating Key pair");
                    keyName = keyNamePrefix + UUID.randomUUID().toString();
                    CreateKeyPairRequest createKeyPairRequest = new CreateKeyPairRequest();

                    createKeyPairRequest.withKeyName(keyName);

                    CreateKeyPairResult createKeyPairResult = amazonEC2.createKeyPair(createKeyPairRequest);

                    KeyPair keyPair = createKeyPairResult.getKeyPair();

                    String privateKey = keyPair.getKeyMaterial();

                    Files.createDirectories(Path.of(mftKeyDir));
                    Files.write(Path.of(mftKeyDir, keyName), privateKey.getBytes(StandardCharsets.UTF_8));
                    logger.info("Created key pair " + keyName);

                } else {
                    keyName = availableKeyPair.get().getKeyName();
                    logger.info("Using existing key pair " + keyName);
                }

                RunInstancesRequest runInstancesRequest = new RunInstancesRequest();

                runInstancesRequest.withImageId(imageId)
                        .withInstanceType(InstanceType.T1Micro) // TODO Externalize
                        .withMinCount(1)
                        .withMaxCount(1)
                        .withKeyName(keyName)
                        .withTagSpecifications(
                                new TagSpecification().withResourceType(ResourceType.Instance)
                                        .withTags(new Tag().withKey("Type").withValue("MFT-Agent"),
                                                new Tag().withKey("AgentId").withValue(agentId),
                                                new Tag().withKey("Name").withValue("MFT-Agent")))
                        .withSecurityGroups(secGroupName);


                logger.info("Launching the EC2 VM to start Agent {}", agentId);
                RunInstancesResult result = amazonEC2.runInstances(runInstancesRequest);

                instanceId = result.getReservation().getInstances().get(0).getInstanceId();

                try {
                    DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest();
                    describeInstancesRequest.setInstanceIds(Collections.singletonList(instanceId));

                    InstanceState instanceState = null;
                    String publicIpAddress = null;

                    logger.info("Waiting until instance {} is ready", instanceId);

                    for (int i = 0; i < 30; i++) {
                        DescribeInstancesResult describeInstancesResult = amazonEC2.describeInstances(describeInstancesRequest);
                        Instance instance = describeInstancesResult.getReservations().get(0).getInstances().get(0);
                        instanceState = instance.getState();
                        publicIpAddress = instance.getPublicIpAddress();

                        logger.info("Instance state {}, public ip {}", instanceState.getName(), publicIpAddress);

                        if (instanceState.getName().equals("running") && publicIpAddress != null) {
                            break;
                        }
                        Thread.sleep(2000);
                    }

                    logger.info("Waiting 30 seconds until the ssh interface comes up in instance {}", instanceId);
                    Thread.sleep(30000);
                    if ("running".equals(instanceState.getName()) && publicIpAddress != null) {
                        sshProvider = new SSHProvider();
                        sshProvider.initConnection(publicIpAddress, 22,
                                Path.of(mftKeyDir, keyName).toAbsolutePath().toString(), systemUser);
                        logger.info("Created SSH Connection. Installing dependencies...");

                        if (! preSavedImage) {
                            int exeCode = sshProvider.runCommand("sudo apt update -y");
                            if (exeCode != 0)
                                throw new IOException("Failed to update apt for VM");
                            exeCode = sshProvider.runCommand("sudo apt install -y openjdk-11-jre-headless");
                            if (exeCode != 0)
                                throw new IOException("Failed to install jdk on new VM");
                            exeCode = sshProvider.runCommand("sudo apt install -y unzip");
                            if (exeCode != 0)
                                throw new IOException("Failed to install unzip on new VM");
                            exeCode = sshProvider.runCommand("wget https://github.com/apache/airavata-mft/releases/download/v0.0.1/MFT-Agent-0.01-bin.zip");
                            if (exeCode != 0)
                                throw new IOException("Failed to download mft distribution");
                            exeCode = sshProvider.runCommand("unzip MFT-Agent-0.01-bin.zip");
                            if (exeCode != 0)
                                throw new IOException("Failed to unzip mft distribution");
                        }

                        int exeCode = sshProvider.runCommand("sed -ir \"s/^[#]*\\s*agent.id=.*/agent.id=" + agentId + "/\" /home/ubuntu/MFT-Agent-0.01/conf/application.properties");
                        if (exeCode != 0)
                            throw new IOException("Failed to update agent id in config file");

                        portForwardLock = new CountDownLatch(1);
                        CountDownLatch portForwardPendingLock = sshProvider.createSshPortForward(8500, portForwardLock);

                        logger.info("Waiting until the port forward is setup");
                        portForwardPendingLock.await();

                        exeCode = sshProvider.runCommand("sh MFT-Agent-0.01/bin/agent-daemon.sh start");
                        if (exeCode != 0)
                            throw new IOException("Failed to start the MFT Agent");

                        // Waiting 10 seconds to start the Agent
                        Thread.sleep(10000);

                    } else {
                        logger.info("Instance {} was not setup properly", instanceId);
                        throw new Exception("Instance " + instanceId + " was not setup properly");
                    }
                } catch (Exception e) {
                    logger.error("Failed preparing instance {}. Deleting the instance", instanceId);
                    TerminateInstancesRequest terminateInstancesRequest = new TerminateInstancesRequest();
                    terminateInstancesRequest.setInstanceIds(Collections.singleton(instanceId));
                    amazonEC2.terminateInstances(terminateInstancesRequest);
                    throw e;
                }

                return agentId;
            } catch (Exception e) {
                logger.error("Failed to spin up the EC2 Agent", e);
                throw new RuntimeException("Failed to spin up the EC2 Agent", e);
            }
        });
    }