in stack/tools/src/main/java/org/apache/usergrid/tools/ShardManager.java [111:241]
public void runTool(CommandLine line) throws Exception {
startSpring();
if (line.getOptionValue(APPLICATION_ARG).isEmpty()) {
throw new RuntimeException("Application ID not provided.");
}
final UUID app = UUID.fromString(line.getOptionValue(APPLICATION_ARG));
String entityType = line.getOptionValue(ENTITY_TYPE_ARG);
String repairTask = line.getOptionValue(REPAIR_TASK);
String shardType = line.getOptionValue(SHARD_TYPE_ARG);
String shardIndex = line.getOptionValue(SHARD_INDEX_ARG);
boolean repair = false;
if( isNotEmpty(repairTask) && (
repairTask.equalsIgnoreCase("removeAllShardEnds") || repairTask.equalsIgnoreCase("removeLastShardEnd") ||
repairTask.equalsIgnoreCase("resetAllCompactionStatus") || repairTask.equalsIgnoreCase("setCompacted") ||
repairTask.equalsIgnoreCase("clearCompacted"))) {
repair = true;
}
logger.info("Starting Tool: ShardManager");
logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"));
EntityRef headEntity = new SimpleEntityRef("application", app);
ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(app, "application"));
EdgeShardSerialization edgeShardSerialization = injector.getInstance(EdgeShardSerialization.class);
String collectionName = InflectionUtils.pluralize(entityType);
// default to assume collection
String metaType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);
if( isNotEmpty(shardType) ){
if( shardType.equalsIgnoreCase("collection")){
metaType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);
}else if( shardType.equalsIgnoreCase("connection")){
metaType = CpNamingUtils.getEdgeTypeFromConnectionType(entityType);
}
}
final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode(headEntity.asId(), metaType);
Iterator<Shard> shards = edgeShardSerialization.getShardMetaData(applicationScope, Optional.absent(), directedEdgeMeta);
boolean firstShard = true;
while (shards.hasNext()) {
Shard shard = shards.next();
logger.info("Seeking over shard: {}", shard);
if(repair) {
logger.info("Repair enabled with task: {}", repairTask);
if ( repairTask.equalsIgnoreCase("setCompacted") && isNotEmpty(shardIndex) && Long.valueOf(shardIndex).equals(shard.getShardIndex())){
logger.info("Setting compacted=true for shard: {}", shard);
shard.setCompacted(true);
edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();
}else if ( repairTask.equalsIgnoreCase("clearCompacted") && isNotEmpty(shardIndex) && Long.valueOf(shardIndex).equals(shard.getShardIndex())){
logger.info("Setting compacted=false for shard: {}", shard);
shard.setCompacted(false);
edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();
} else if( repairTask.equalsIgnoreCase("removeLastShardEnd") && firstShard){
logger.info("Removing shard end from shard: {}", shard);
shard.setShardEnd(Optional.absent());
edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();
}else if ( repairTask.equalsIgnoreCase("removeAllShardEnds")){
logger.info("Removing shard end from shard: {}", shard);
shard.setShardEnd(Optional.absent());
edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();
} else if ( repairTask.equalsIgnoreCase("resetAllCompactionStatus") && shard.getShardIndex() != 0){
logger.info("Setting compacted=false for shard: {}", shard);
shard.setCompacted(false);
edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();
}
firstShard = false;
}
}
if(repair) {
// do a final walk-through so changes can be verified
Iterator<Shard> finalshards = edgeShardSerialization.getShardMetaData(applicationScope, Optional.absent(), directedEdgeMeta);
while (finalshards.hasNext()) {
Shard shard = finalshards.next();
logger.info("Shard after repair: {}", shard);
}
}
logger.info("ShardManager run complete");
}