in aws-blog-hbase-on-emr/hbase-connector/src/main/java/com/amazonaws/hbase/kinesis/utils/KinesisUtils.java [93:149]
public static void createAndWaitForStreamToBecomeAvailable(AmazonKinesisClient kinesisClient,
String streamName,
int shardCount) {
if (streamExists(kinesisClient, streamName)) {
String state = streamState(kinesisClient, streamName);
switch (state) {
case "DELETING":
long startTime = System.currentTimeMillis();
long endTime = startTime + 1000 * 120;
while (System.currentTimeMillis() < endTime && streamExists(kinesisClient, streamName)) {
try {
LOG.info("...Deleting Stream " + streamName + "...");
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
}
}
if (streamExists(kinesisClient, streamName)) {
LOG.error("KinesisUtils timed out waiting for stream " + streamName + " to delete");
throw new IllegalStateException("KinesisUtils timed out waiting for stream " + streamName
+ " to delete");
}
case "ACTIVE":
LOG.info("Stream " + streamName + " is ACTIVE");
return;
case "CREATING":
break;
case "UPDATING":
LOG.info("Stream " + streamName + " is UPDATING");
return;
default:
throw new IllegalStateException("Illegal stream state: " + state);
}
} else {
CreateStreamRequest createStreamRequest = new CreateStreamRequest();
createStreamRequest.setStreamName(streamName);
createStreamRequest.setShardCount(shardCount);
kinesisClient.createStream(createStreamRequest);
LOG.info("Stream " + streamName + " created");
}
long startTime = System.currentTimeMillis();
long endTime = startTime + (10 * 60 * 1000);
while (System.currentTimeMillis() < endTime) {
try {
Thread.sleep(1000 * 10);
} catch (Exception e) {
}
try {
String streamStatus = streamState(kinesisClient, streamName);
if (streamStatus.equals("ACTIVE")) {
LOG.info("Stream " + streamName + " is ACTIVE");
return;
}
} catch (ResourceNotFoundException e) {
throw new IllegalStateException("Stream " + streamName + " never went active");
}
}
}