in stack/tools/src/main/java/org/apache/usergrid/tools/EntityVersionAudit.java [139:275]
public void runTool( CommandLine line ) throws Exception {
logger.info("Starting Tool: EntityVersionAudit");
logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"));
startSpring();
String applicationOption = line.getOptionValue(APPLICATION_ARG);
String entityTypeOption = line.getOptionValue(ENTITY_TYPE_ARG);
if (isBlank(applicationOption)) {
throw new RuntimeException("Application ID not provided.");
}
final UUID app = UUID.fromString(line.getOptionValue(APPLICATION_ARG));
if (isBlank(entityTypeOption)) {
throw new RuntimeException("Entity type (singular collection name) not provided.");
}
String entityType = entityTypeOption;
boolean useLatestVersion =
line.getOptionValue(USE_LATEST_VERSION_ARG) != null && line.getOptionValue(USE_LATEST_VERSION_ARG).equalsIgnoreCase("true");
logger.info("useLatestVersion {}", useLatestVersion);
final String entityUUID = line.getOptionValue(ENTITY_UUID);
logger.info("entityUUID {}", entityUUID);
em = emf.getEntityManager( app );
String collectionName = InflectionUtils.pluralize(entityType);
String simpleEdgeType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);
logger.info("simpleEdgeType: {}", simpleEdgeType);
ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(app, "application"));
Id applicationScopeId = applicationScope.getApplication();
logger.info("applicationScope.getApplication(): {}", applicationScopeId);
GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
GraphManager gm = gmf.createEdgeManager(applicationScope);
EntityCollectionManagerFactory emf = injector.getInstance( EntityCollectionManagerFactory.class );
EntityCollectionManager ecm = emf.createCollectionManager(applicationScope);
final SimpleSearchByEdgeType search =
new SimpleSearchByEdgeType( applicationScopeId, simpleEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
Optional.absent(), false );
final IndexLocationStrategyFactory ilsf = injector.getInstance(IndexLocationStrategyFactory.class);
final Writer versionAuditWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("entity_version_audit.txt"), "utf-8"));
final Writer versionAggWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("entity_version_agg.txt"), "utf-8"));
versionAuditWriter.write("collection,entityUUID,entityVersion,cassandraTimestamp,elasticsearchTimestamp,indexDelayMillis,existsInElasticsearch\n");
versionAuditWriter.flush();
final EsProvider esProvider = injector.getInstance(EsProvider.class);
gm.loadEdgesFromSource(search).map(markedEdge -> {
UUID uuid = markedEdge.getTargetNode().getUuid();
if (entityUUID == null || uuid.equals(UUID.fromString(entityUUID))){
logger.info("matched uuid: {}", uuid);
try {
EntityRef entityRef = new SimpleEntityRef(entityType, uuid);
org.apache.usergrid.persistence.Entity retrieved = em.get(entityRef);
if ( retrieved != null ){
final AtomicInteger versionCount = new AtomicInteger();
Observable<MvccLogEntry> versionObs = ecm.getVersionsFromMaxToMin( retrieved.asId(), org.apache.usergrid.utils.UUIDUtils.newTimeUUID() );
if (useLatestVersion) {
versionObs = versionObs.take(1);
}
versionObs.forEach( mvccLogEntry -> {
IndexLocationStrategy strategy = ilsf.getIndexLocationStrategy(applicationScope);
final String readAlias = strategy.getAlias().getReadAlias();
final SearchEdge searchEdge = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( retrieved.asId().getType() ) ), retrieved.asId(),
Long.MAX_VALUE ) );
final String esDocId = createIndexDocId( applicationScope, retrieved.asId(), mvccLogEntry.getVersion(), searchEdge);
GetResponse response = esProvider.getClient().prepareGet(readAlias, "entity", esDocId)
.execute()
.actionGet();
boolean exists = response.isExists();
long indexTimestamp = response.getField("_timestamp") == null ? 0 : (long)response.getField("_timestamp").getValue();
long uuidTimestamp = UUIDUtils.getTimestampInMillis(retrieved.getUuid());
long diff = 0;
if (indexTimestamp > 0) {
diff = uuidTimestamp = indexTimestamp;
}
try {
String csvLine =
collectionName + "," +
uuid + "," +
mvccLogEntry.getVersion() + "," +
uuidTimestamp + "," +
indexTimestamp + "," +
diff + "," +
exists;
//final String url = "/"+readAlias+"/entity/"+URLEncoder.encode(esDocId, "UTF-8");
versionAuditWriter.write(csvLine+"\n");
versionAuditWriter.flush();
versionCount.incrementAndGet();
} catch (Exception e) {
e.printStackTrace();
}
});
versionAggWriter.write(versionCount.toString()+","+retrieved.asId().getUuid()+"\n");
versionAggWriter.flush();
}else{
logger.info("entity: {} NOT FOUND", uuid);
}
} catch (Exception e) {
e.printStackTrace();
}
}
return markedEdge;
}).toBlocking().lastOrDefault(null);
versionAuditWriter.close();
versionAggWriter.close();
}