public void runTool()

in stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java [135:308]


    public void runTool( CommandLine line ) throws Exception {

        startSpring();

        String applicationOption = line.getOptionValue(APPLICATION_ARG);
        String entityTypeOption = line.getOptionValue(ENTITY_TYPE_ARG);
        String removeOrphanConnectionsOption = line.getOptionValue(REMOVE_ORPHAN_CONNECTIONS_ARG);
        String removeDuplicateConnectionsOption = line.getOptionValue(REMOVE_DUPLICATE_CONNECTIONS_ARG);
        String earliestTimestampOption = line.getOptionValue(EARLIEST_TIMESTAMP_ARG);
        String latestTimestampOption = line.getOptionValue(LATEST_TIMESTAMP_ARG);
        String secondsInPastOption = line.getOptionValue(SECONDS_IN_PAST_ARG);

        if (isBlank(applicationOption)) {
            throw new RuntimeException("Application ID not provided.");
        }
        final UUID app = UUID.fromString(line.getOptionValue(APPLICATION_ARG));

        if (isBlank(entityTypeOption)) {
            throw new RuntimeException("Entity type (singular collection name) not provided.");
        }
        String entityType = entityTypeOption;

        final boolean removeOrphans = !isBlank(removeOrphanConnectionsOption) && removeOrphanConnectionsOption.toLowerCase().equals("yes");
        final boolean removeDuplicates = !isBlank(removeDuplicateConnectionsOption) && removeDuplicateConnectionsOption.toLowerCase().equals("yes");

        if (!isBlank(secondsInPastOption) && !isBlank(latestTimestampOption)) {
            throw new RuntimeException("Can't specify both latest timestamp and seconds in past options.");
        }

        long earliest = 0L;
        if (!isBlank(earliestTimestampOption)) {
            try {
                earliest = Long.parseLong(earliestTimestampOption);
            } catch (Exception e) {
                throw new RuntimeException("Cannot convert earliest timestamp to long: " + earliestTimestampOption);
            }
        }
        final long earliestTimestamp = earliest;

        long currentTimestamp = System.currentTimeMillis();

        // default to DEFAULT_SECONDS_IN_PAST
        long latest = currentTimestamp - (DEFAULT_SECONDS_IN_PAST * 1000L);
        if (!isBlank(latestTimestampOption)) {
            try {
                latest = Long.parseLong(latestTimestampOption);
            } catch (Exception e) {
                throw new RuntimeException("Cannot convert latest timestamp to long: " + latestTimestampOption);
            }
        } else if (!isBlank(secondsInPastOption)) {
            try {
                long secondsInPast = Long.parseLong(secondsInPastOption);
                latest = currentTimestamp - (secondsInPast * 1000L);
            } catch (Exception e) {
                throw new RuntimeException("Cannot convert seconds in past to long: " + secondsInPastOption);
            }
        }
        final long latestTimestamp = latest;

        logger.info("Starting Tool: CollectionIterator");
        logger.info("Orphans {} be deleted", removeOrphans ? "WILL" : "will not");
        logger.info("Timestamp range {} to {}", Long.toString(earliestTimestamp), Long.toString(latestTimestamp));
        logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"));

        em = emf.getEntityManager( app );
        EntityRef headEntity = new SimpleEntityRef("application", app);

        CollectionService collectionService = injector.getInstance(CollectionService.class);
        String collectionName = InflectionUtils.pluralize(entityType);
        String simpleEdgeType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);
        logger.info("simpleEdgeType: {}", simpleEdgeType);

        ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(app, "application"));
        Id applicationScopeId = applicationScope.getApplication();
        logger.info("applicationScope.getApplication(): {}", applicationScopeId);
        EdgeSerialization edgeSerialization = injector.getInstance(EdgeSerialization.class);

        Query query = new Query();
        query.setCollection(collectionName);
        query.setLimit(1000);

        com.google.common.base.Optional<String> queryString = com.google.common.base.Optional.absent();

        CollectionInfo collection = getDefaultSchema().getCollection(headEntity.getType(), collectionName);

        GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
        GraphManager gm = gmf.createEdgeManager(applicationScope);


        final SimpleSearchByEdgeType search =
            new SimpleSearchByEdgeType( applicationScopeId, simpleEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
                Optional.absent(), false );

        Set<UUID> uuidSet = new HashSet<>();

        gm.loadEdgesFromSource(search).map(markedEdge -> {

            UUID uuid = markedEdge.getTargetNode().getUuid();
            long edgeTimestamp = markedEdge.getTimestamp();
            String edgeType = markedEdge.getType();
            boolean duplicate = uuidSet.contains(uuid);
            if (!duplicate) {
                uuidSet.add(uuid);
            }
            try {
                    EntityRef entityRef = new SimpleEntityRef(entityType, uuid);
                    org.apache.usergrid.persistence.Entity retrieved = em.get(entityRef);

                    long timestamp = 0;
                    DateFormat df = new SimpleDateFormat();
                    df.setTimeZone(TimeZone.getTimeZone("GMT"));
                    String dateString = "NOT TIME-BASED";
                    if (UUIDUtils.isTimeBased(uuid)){
                        timestamp = UUIDUtils.getTimestampInMillis(uuid);
                        Date uuidDate = new Date(timestamp);
                        dateString = df.format(uuidDate) + " GMT";
                    }
                    Date uuidEdgeDate = new Date(UUIDUtils.getUnixTimestampInMillisFromUUIDTimestamp(edgeTimestamp));
                    String edgeDateString = df.format(uuidEdgeDate) + " GMT";


                    if ( retrieved != null ){

                        if (duplicate) {
                            if (removeDuplicates) {
                                logger.info("DUPLICATE ENTITY (REMOVING): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}",
                                    uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString);
                                try {
                                    MutationBatch batch = edgeSerialization.deleteEdge(applicationScope, markedEdge, UUIDUtils.newTimeUUID());
                                    logger.info("BATCH: {}", batch);
                                    batch.execute();
                                } catch (Exception e) {
                                    logger.error("{} - exception while trying to remove orphaned connection, {}", uuid, e.getMessage());
                                }
                            } else {
                                logger.info("DUPLICATE ENTITY (WON'T REMOVE): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}",
                                    uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString);
                            }

                        } else {
                            logger.info("ENTITY: uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}",
                                uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString);
                        }


                    }else{
                        if (removeOrphans && timestamp >= earliestTimestamp && timestamp <= latestTimestamp) {
                            logger.info("NOT FOUND (REMOVING): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}",
                                uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted());
                            try {
                                MutationBatch batch = edgeSerialization.deleteEdge(applicationScope, markedEdge, UUIDUtils.newTimeUUID());
                                logger.info("BATCH: {}", batch);
                                batch.execute();
                            } catch (Exception e) {
                                logger.error("{} - exception while trying to remove orphaned connection, {}", uuid, e.getMessage());
                            }
                        } else if (removeOrphans) {
                            logger.info("NOT FOUND (TIMESTAMP OUT OF RANGE): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}",
                                uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted());
                        } else {
                            logger.info("NOT FOUND: uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}",
                                uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted());
                        }
                    }
                } catch (Exception e) {
                    logger.error("{} - exception while trying to load entity data, {} ", uuid, e.getMessage());
                }



           return markedEdge;
        }).toBlocking().lastOrDefault(null);

    }