public void runTool()

in stack/tools/src/main/java/org/apache/usergrid/tools/ShardManager.java [111:241]


    public void runTool(CommandLine line) throws Exception {

        startSpring();

        if (line.getOptionValue(APPLICATION_ARG).isEmpty()) {
            throw new RuntimeException("Application ID not provided.");
        }
        final UUID app = UUID.fromString(line.getOptionValue(APPLICATION_ARG));

        String entityType = line.getOptionValue(ENTITY_TYPE_ARG);

        String repairTask = line.getOptionValue(REPAIR_TASK);

        String shardType = line.getOptionValue(SHARD_TYPE_ARG);

        String shardIndex = line.getOptionValue(SHARD_INDEX_ARG);

        boolean repair = false;
        if( isNotEmpty(repairTask) && (
            repairTask.equalsIgnoreCase("removeAllShardEnds") || repairTask.equalsIgnoreCase("removeLastShardEnd") ||
            repairTask.equalsIgnoreCase("resetAllCompactionStatus") || repairTask.equalsIgnoreCase("setCompacted") ||
                repairTask.equalsIgnoreCase("clearCompacted"))) {

            repair = true;
        }


        logger.info("Starting Tool: ShardManager");
        logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"));

        EntityRef headEntity = new SimpleEntityRef("application", app);

        ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(app, "application"));
        EdgeShardSerialization edgeShardSerialization = injector.getInstance(EdgeShardSerialization.class);

        String collectionName = InflectionUtils.pluralize(entityType);

        // default to assume collection
        String metaType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);

            if( isNotEmpty(shardType) ){

            if( shardType.equalsIgnoreCase("collection")){
                metaType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);

            }else if( shardType.equalsIgnoreCase("connection")){
                metaType = CpNamingUtils.getEdgeTypeFromConnectionType(entityType);
            }
        }


        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode(headEntity.asId(), metaType);

        Iterator<Shard> shards = edgeShardSerialization.getShardMetaData(applicationScope, Optional.absent(), directedEdgeMeta);

        boolean firstShard = true;
        while (shards.hasNext()) {
            Shard shard = shards.next();


            logger.info("Seeking over shard: {}", shard);

            if(repair) {

                logger.info("Repair enabled with task: {}", repairTask);

                if ( repairTask.equalsIgnoreCase("setCompacted") && isNotEmpty(shardIndex)  && Long.valueOf(shardIndex).equals(shard.getShardIndex())){

                    logger.info("Setting compacted=true for shard: {}", shard);

                    shard.setCompacted(true);
                    edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();


                }else if ( repairTask.equalsIgnoreCase("clearCompacted") && isNotEmpty(shardIndex)  && Long.valueOf(shardIndex).equals(shard.getShardIndex())){

                    logger.info("Setting compacted=false for shard: {}", shard);

                    shard.setCompacted(false);
                    edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();


                } else if( repairTask.equalsIgnoreCase("removeLastShardEnd") && firstShard){

                    logger.info("Removing shard end from shard: {}", shard);

                    shard.setShardEnd(Optional.absent());
                    edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();

                }else if ( repairTask.equalsIgnoreCase("removeAllShardEnds")){

                    logger.info("Removing shard end from shard: {}", shard);

                    shard.setShardEnd(Optional.absent());
                    edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();

                } else if ( repairTask.equalsIgnoreCase("resetAllCompactionStatus") && shard.getShardIndex() != 0){

                    logger.info("Setting compacted=false for shard: {}", shard);

                    shard.setCompacted(false);
                    edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();

                }

                firstShard = false;


            }

        }


        if(repair) {
            // do a final walk-through so changes can be verified
            Iterator<Shard> finalshards = edgeShardSerialization.getShardMetaData(applicationScope, Optional.absent(), directedEdgeMeta);

            while (finalshards.hasNext()) {

                Shard shard = finalshards.next();

                logger.info("Shard after repair: {}", shard);

            }
        }


        logger.info("ShardManager run complete");


    }