in aws-blog-hbase-on-emr/hbase-connector/src/main/java/com/amazonaws/hbase/kinesis/utils/EMRUtils.java [62:125]
public static String createCluster(AmazonElasticMapReduce client,
String clusterIdentifier,
String amiVersion,
String keypair,
String masterInstanceType,
String coreInstanceType,
String logUri,
int numberOfNodes) {
if (clusterExists(client, clusterIdentifier)) {
LOG.info("Cluster " + clusterIdentifier + " is available");
return clusterIdentifier;
}
//Error checking
if (amiVersion == null || amiVersion.isEmpty()) throw new RuntimeException("ERROR: Please specify an AMI Version");
if (keypair == null || keypair.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon Key Pair");
if (masterInstanceType == null || masterInstanceType.isEmpty()) throw new RuntimeException("ERROR: Please specify a Master Instance Type");
if (logUri == null || logUri.isEmpty()) throw new RuntimeException("ERROR: Please specify a valid Amazon S3 bucket for your logs.");
if (numberOfNodes < 0) throw new RuntimeException("ERROR: Please specify at least 1 node");
RunJobFlowRequest request = new RunJobFlowRequest()
.withAmiVersion(amiVersion)
.withBootstrapActions(new BootstrapActionConfig()
.withName("Install HBase")
.withScriptBootstrapAction(new ScriptBootstrapActionConfig()
.withPath("s3://elasticmapreduce/bootstrap-actions/setup-hbase")))
.withName("Job Flow With HBAse Actions")
.withSteps(new StepConfig() //enable debugging step
.withName("Enable debugging")
.withActionOnFailure("TERMINATE_CLUSTER")
.withHadoopJarStep(new StepFactory().newEnableDebuggingStep()),
//Start HBase step - after installing it with a bootstrap action
createStepConfig("Start HBase","TERMINATE_CLUSTER", "/home/hadoop/lib/hbase.jar", getHBaseArgs()),
//add HBase backup step
createStepConfig("Modify backup schedule","TERMINATE_JOB_FLOW", "/home/hadoop/lib/hbase.jar", getHBaseBackupArgs()))
.withLogUri(logUri)
.withInstances(new JobFlowInstancesConfig()
.withEc2KeyName(keypair)
.withInstanceCount(numberOfNodes)
.withKeepJobFlowAliveWhenNoSteps(true)
.withMasterInstanceType(masterInstanceType)
.withSlaveInstanceType(coreInstanceType));
RunJobFlowResult result = client.runJobFlow(request);
String state = null;
while (!(state = clusterState(client, result.getJobFlowId())).equalsIgnoreCase("waiting")) {
try {
Thread.sleep(10 * 1000);
LOG.info(result.getJobFlowId() + " is " + state + ". Waiting for cluster to become available.");
} catch (InterruptedException e) {
}
if (state.equalsIgnoreCase("TERMINATED_WITH_ERRORS")){
LOG.error("Could not create EMR Cluster");
System.exit(-1);
}
}
LOG.info("Created cluster " + result.getJobFlowId());
LOG.info("Cluster " + clusterIdentifier + " is available");
return result.getJobFlowId();
}