public static String createCluster()

in aws-blog-hbase-on-emr/hbase-connector/src/main/java/com/amazonaws/hbase/kinesis/utils/EMRUtils.java [62:125]


	public static String createCluster(AmazonElasticMapReduce client,
			String clusterIdentifier,
			String amiVersion,
			String keypair,
			String masterInstanceType,
			String coreInstanceType,
			String logUri,
			int numberOfNodes) {

		if (clusterExists(client, clusterIdentifier)) {
			LOG.info("Cluster " + clusterIdentifier + " is available");
			return clusterIdentifier;
		}
		
		//Error checking
		if (amiVersion == null || amiVersion.isEmpty()) throw new RuntimeException("ERROR: Please specify an AMI Version");
		if (keypair == null || keypair.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon Key Pair");
		if (masterInstanceType == null || masterInstanceType.isEmpty()) throw new RuntimeException("ERROR: Please specify a Master Instance Type");
		if (logUri == null || logUri.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon S3 bucket for your logs.");
		if (numberOfNodes < 0) throw new RuntimeException("ERROR: Please specify at least 1 node");
		  		
		  RunJobFlowRequest request = new RunJobFlowRequest()
		    .withAmiVersion(amiVersion)
			.withBootstrapActions(new BootstrapActionConfig()
			             .withName("Install HBase")
			             .withScriptBootstrapAction(new ScriptBootstrapActionConfig()
			             .withPath("s3://elasticmapreduce/bootstrap-actions/setup-hbase")))
			.withName("Job Flow With HBAse Actions")	 
			.withSteps(new StepConfig() //enable debugging step
						.withName("Enable debugging")
						.withActionOnFailure("TERMINATE_CLUSTER")
						.withHadoopJarStep(new StepFactory().newEnableDebuggingStep()), 
						//Start HBase step - after installing it with a bootstrap action
						createStepConfig("Start HBase","TERMINATE_CLUSTER", "/home/hadoop/lib/hbase.jar", getHBaseArgs()), 
						//add HBase backup step
						createStepConfig("Modify backup schedule","TERMINATE_JOB_FLOW", "/home/hadoop/lib/hbase.jar", getHBaseBackupArgs()))
			.withLogUri(logUri)
			.withInstances(new JobFlowInstancesConfig()
			.withEc2KeyName(keypair)
			.withInstanceCount(numberOfNodes)
			.withKeepJobFlowAliveWhenNoSteps(true)
			.withMasterInstanceType(masterInstanceType)
			.withSlaveInstanceType(coreInstanceType));

		RunJobFlowResult result = client.runJobFlow(request);
		
		String state = null;
		while (!(state = clusterState(client, result.getJobFlowId())).equalsIgnoreCase("waiting")) {
			try {
				Thread.sleep(10 * 1000);
				LOG.info(result.getJobFlowId() + " is " + state + ". Waiting for cluster to become available.");
			} catch (InterruptedException e) {

			}
			
			if (state.equalsIgnoreCase("TERMINATED_WITH_ERRORS")){
				LOG.error("Could not create EMR Cluster");
				System.exit(-1);	
			}
		}
		LOG.info("Created cluster " + result.getJobFlowId());
		LOG.info("Cluster " + clusterIdentifier + " is available");	
		return result.getJobFlowId();
	}