in src/main/java/com/amazonaws/services/kinesis/scaling/ShardHashInfo.java [129:160]
public AdjacentShards doSplit(KinesisClient kinesisClient, double targetPct, String currentHighestShardId)
throws Exception {
BigInteger targetHash = getHashAtPctOffset(targetPct);
// split the shard
StreamScalingUtils.splitShard(kinesisClient, this.streamName, this.getShardId(), targetHash, true);
ShardHashInfo lowerShard = null;
ShardHashInfo higherShard = null;
// resolve the newly created shards from this one
Map<String, ShardHashInfo> openShards = StreamScalingUtils.getOpenShards(kinesisClient, streamName,
currentHighestShardId);
for (ShardHashInfo info : openShards.values()) {
if (!info.getShard().shardId().equals(this.shard.shardId())) {
if (info.getShard().hashKeyRange().startingHashKey().equals(targetHash.toString())) {
higherShard = new ShardHashInfo(this.streamName, info.getShard());
break;
} else {
lowerShard = new ShardHashInfo(this.streamName, info.getShard());
}
}
}
if (lowerShard == null || higherShard == null) {
throw new Exception(String.format("Unable to resolve high/low shard mapping for Target Hash Value %s",
targetHash.toString()));
}
return new AdjacentShards(streamName, lowerShard, higherShard);
}