in stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java [244:423]
private void extractEntitiesForCollection(UUID applicationId, String collectionName) {
AtomicInteger batch = new AtomicInteger(1);
final EntityManager rootEm = emf.getEntityManager(applicationId);
ExecutorService edgeScopeFetcher = Executors.newFixedThreadPool(1);
allEntityIdsObs
.getEdgesToEntities(Observable.just(CpNamingUtils.getApplicationScope(applicationId)),
Optional.fromNullable(
CpNamingUtils.getEdgeTypeFromCollectionName(collectionName.toLowerCase())),
(lastEdge == null ? Optional.absent() : Optional.fromNullable(lastEdge)))
.buffer(1000).finallyDo(() -> {
edgeScopeFetcher.shutdown();
logger.info("Finished fetching entity ids for {}. Shutting down entity edge scope fetcher ",
collectionName);
while (!edgeScopeFetcher.isTerminated()) {
try {
edgeScopeFetcher.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
logger.info("Entity edge scope fetcher terminated after shutdown for {}", collectionName);
}).subscribe(edges -> {
logger.info("For collection {}", collectionName);
Integer batchId = batch.getAndIncrement();
logger.info("Started fetching details for collection {} batch {} ", collectionName, batchId);
Observable.just(edges).subscribeOn(Schedulers.from(edgeScopeFetcher)).subscribe(edgeScopes -> {
List<UUID> entityIds = new ArrayList<UUID>(1000);
for (EdgeScope edgeScope : edgeScopes) {
Id entityId = edgeScope.getEdge().getTargetNode();
if (entityId != null) {
entityIds.add(entityId.getUuid());
} else {
edgeScopes.remove(edgeScope);
}
}
try {
String type = edgeScopes.get(0).getEdge().getTargetNode().getType();
Observable.just(entityIds).subscribeOn(Schedulers.from(entityFetcher)) // change to
.subscribe(entIds -> {
logger.info("Fetched {} entity id's of type {} for batch ID {}", entIds.size(),
type, batchId);
Results entities = rootEm.getEntities(entIds, type);
logger.info("Fetched {} entities of type {} for batch ID {}", entities.size(),
type, batchId);
try {
ConnectableObservable<Entity> entityObs = Observable
.from(entities.getEntities()).publish();
entityObs.subscribeOn(Schedulers.from(uniqueValueChecker));
entityObs.subscribe(t -> {
logger.info("Fetched entity with UUID : {}", t.getUuid());
if (findMissingUniqueValues) {
String fieldValue = null;
//We can search entity with UUID or name/email based on the entity type.
//This mapping between unique value field(name/email etc) and UUID,
//is stored in unique value table. This can either be name / email or any other type.
//This value is being passed as field type.
//The code below takes the parameter and retrieves the value of the field using the getter method.
if (fieldType == null || fieldType.equals("")
|| fieldType.equals("name")) {
fieldType = "name";
fieldValue = t.getName();
} else {
try {
Method method = t.getClass()
.getMethod("get"
+ fieldType.substring(0, 1).toUpperCase()
+ fieldType.substring(1));
fieldValue = (String) method.invoke(t);
} catch (Exception e1) {
logger.error(
"Exception while trying to fetch field value of type {} for entity {} batch {}",
fieldType, t.getUuid(), batchId, e1);
}
}
try {
if (fieldValue != null) {
Entity e = rootEm.getUniqueEntityFromAlias(t.getType(),
fieldValue, false);
if (e == null) {
logger.info(
"No entity found for field type {} and field value {} but exists for UUID {}",
fieldType, fieldValue, t.getUuid());
if (fixMissingValue) {
logger.info(
"Trying to repair unique value mapping for {} ",
t.getUuid());
UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy
.load(new ApplicationScopeImpl(new SimpleId(
applicationId, "application")),
ConsistencyLevel
.valueOf(System.getProperty(
"usergrid.read.cl",
"LOCAL_QUORUM")),
t.getType(),
Collections.singletonList(
new StringField(fieldType,
fieldValue)),
false);
ApplicationScope applicationScope = new ApplicationScopeImpl(
new SimpleId(applicationId, "application"));
com.google.common.base.Optional<MvccEntity> entity = mvccEntitySerializationStrategy
.load(applicationScope, new SimpleId(
t.getUuid(), t.getType()));
if (!entity.isPresent()
|| !entity.get().getEntity().isPresent()) {
throw new RuntimeException(
"Unable to update unique value index because supplied UUID "
+ t.getUuid()
+ " does not exist");
}
logger.info("Delete unique value: {}",
uniqueValueSet.getValue(fieldType));
try {
session.execute(uniqueValueSerializationStrategy
.deleteCQL(applicationScope,
uniqueValueSet
.getValue(fieldType)));
} catch (Exception ex) {
logger.error(
"Exception while trying to delete the Unique value for {}. Will proceed with creating new entry",
t.getUuid(), ex);
}
UniqueValue newUniqueValue = new UniqueValueImpl(
new StringField(fieldType, fieldValue),
entity.get().getId(),
entity.get().getVersion());
logger.info("Writing new unique value: {}",
newUniqueValue);
session.execute(uniqueValueSerializationStrategy
.writeCQL(applicationScope, newUniqueValue,
-1));
}
} else {
logger.info(
"Found entity {} for field type {} and field value {}",
e.getUuid(), fieldType, fieldValue);
}
} else {
logger.info("No value found for field {} for entity {}",
fieldType, t.getUuid());
}
} catch (Exception e) {
logger.error(
"Error while checking unique values for batch id : {} for entity {}",
batchId, t.getUuid(), e);
}
}
});
entityObs.connect();
} catch (Exception e) {
logger.error(
"Error while checking unique values for batch id : {} for collection {}",
batchId, collectionName, e);
}
});
} catch (Exception e) {
logger.error("Exception while traversing entities " + edgeScopes.get(0).getEdge(), e);
System.exit(0);
}
});
logger.info("Finished entity walk for collection {} for batch {}", collectionName, batchId);
});
logger.info("Exiting extractEntitiesForCollection() method.");
}