public void runTool()

in stack/tools/src/main/java/org/apache/usergrid/tools/EntityVersionAudit.java [139:275]


    public void runTool( CommandLine line ) throws Exception {

        logger.info("Starting Tool: EntityVersionAudit");
        logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"));

        startSpring();

        String applicationOption = line.getOptionValue(APPLICATION_ARG);
        String entityTypeOption = line.getOptionValue(ENTITY_TYPE_ARG);

        if (isBlank(applicationOption)) {
            throw new RuntimeException("Application ID not provided.");
        }
        final UUID app = UUID.fromString(line.getOptionValue(APPLICATION_ARG));

        if (isBlank(entityTypeOption)) {
            throw new RuntimeException("Entity type (singular collection name) not provided.");
        }
        String entityType = entityTypeOption;


        boolean useLatestVersion =
            line.getOptionValue(USE_LATEST_VERSION_ARG) != null && line.getOptionValue(USE_LATEST_VERSION_ARG).equalsIgnoreCase("true");
        logger.info("useLatestVersion {}", useLatestVersion);

        final String entityUUID = line.getOptionValue(ENTITY_UUID);
        logger.info("entityUUID {}", entityUUID);


        em = emf.getEntityManager( app );

        String collectionName = InflectionUtils.pluralize(entityType);
        String simpleEdgeType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);
        logger.info("simpleEdgeType: {}", simpleEdgeType);

        ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(app, "application"));
        Id applicationScopeId = applicationScope.getApplication();
        logger.info("applicationScope.getApplication(): {}", applicationScopeId);

        GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
        GraphManager gm = gmf.createEdgeManager(applicationScope);

        EntityCollectionManagerFactory emf = injector.getInstance( EntityCollectionManagerFactory.class );
        EntityCollectionManager ecm = emf.createCollectionManager(applicationScope);

        final SimpleSearchByEdgeType search =
            new SimpleSearchByEdgeType( applicationScopeId, simpleEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
                Optional.absent(), false );

        final IndexLocationStrategyFactory ilsf = injector.getInstance(IndexLocationStrategyFactory.class);
        final Writer versionAuditWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("entity_version_audit.txt"), "utf-8"));
        final Writer versionAggWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("entity_version_agg.txt"), "utf-8"));

        versionAuditWriter.write("collection,entityUUID,entityVersion,cassandraTimestamp,elasticsearchTimestamp,indexDelayMillis,existsInElasticsearch\n");
        versionAuditWriter.flush();

        final EsProvider esProvider = injector.getInstance(EsProvider.class);

        gm.loadEdgesFromSource(search).map(markedEdge -> {

            UUID uuid = markedEdge.getTargetNode().getUuid();

            if (entityUUID == null || uuid.equals(UUID.fromString(entityUUID))){
                logger.info("matched uuid: {}", uuid);
                try {
                    EntityRef entityRef = new SimpleEntityRef(entityType, uuid);
                    org.apache.usergrid.persistence.Entity retrieved = em.get(entityRef);

                    if ( retrieved != null ){

                        final AtomicInteger versionCount = new AtomicInteger();
                        Observable<MvccLogEntry> versionObs = ecm.getVersionsFromMaxToMin( retrieved.asId(), org.apache.usergrid.utils.UUIDUtils.newTimeUUID() );
                        if (useLatestVersion) {
                            versionObs = versionObs.take(1);
                        }
                        versionObs.forEach( mvccLogEntry -> {

                            IndexLocationStrategy strategy = ilsf.getIndexLocationStrategy(applicationScope);
                            final String readAlias = strategy.getAlias().getReadAlias();

                            final SearchEdge searchEdge = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
                                CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( retrieved.asId().getType() ) ), retrieved.asId(),
                                Long.MAX_VALUE ) );

                            final String esDocId = createIndexDocId( applicationScope, retrieved.asId(), mvccLogEntry.getVersion(), searchEdge);
                            GetResponse response =  esProvider.getClient().prepareGet(readAlias, "entity", esDocId)
                                .execute()
                                .actionGet();
                            boolean exists = response.isExists();

                            long indexTimestamp = response.getField("_timestamp") == null ? 0 : (long)response.getField("_timestamp").getValue();
                            long uuidTimestamp = UUIDUtils.getTimestampInMillis(retrieved.getUuid());

                            long diff = 0;
                            if (indexTimestamp > 0) {
                                diff = uuidTimestamp = indexTimestamp;
                            }

                            try {

                                String csvLine =
                                    collectionName + "," +
                                    uuid + "," +
                                    mvccLogEntry.getVersion() + "," +
                                    uuidTimestamp + "," +
                                    indexTimestamp + "," +
                                    diff + "," +
                                    exists;

                                //final String url = "/"+readAlias+"/entity/"+URLEncoder.encode(esDocId, "UTF-8");
                                versionAuditWriter.write(csvLine+"\n");
                                versionAuditWriter.flush();
                                versionCount.incrementAndGet();

                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        });

                        versionAggWriter.write(versionCount.toString()+","+retrieved.asId().getUuid()+"\n");
                        versionAggWriter.flush();

                    }else{
                        logger.info("entity: {} NOT FOUND", uuid);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

           return markedEdge;
        }).toBlocking().lastOrDefault(null);

        versionAuditWriter.close();
        versionAggWriter.close();

    }