private void extractEntitiesForCollection()

in stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java [244:423]


	private void extractEntitiesForCollection(UUID applicationId, String collectionName) {

		AtomicInteger batch = new AtomicInteger(1);

		final EntityManager rootEm = emf.getEntityManager(applicationId);

		ExecutorService edgeScopeFetcher = Executors.newFixedThreadPool(1);
		allEntityIdsObs
				.getEdgesToEntities(Observable.just(CpNamingUtils.getApplicationScope(applicationId)),
						Optional.fromNullable(
								CpNamingUtils.getEdgeTypeFromCollectionName(collectionName.toLowerCase())),
						(lastEdge == null ? Optional.absent() : Optional.fromNullable(lastEdge)))
				.buffer(1000).finallyDo(() -> {
					edgeScopeFetcher.shutdown();
					logger.info("Finished fetching entity ids for {}. Shutting down entity edge scope fetcher ",
							collectionName);
					while (!edgeScopeFetcher.isTerminated()) {
						try {
							edgeScopeFetcher.awaitTermination(10, TimeUnit.SECONDS);
						} catch (InterruptedException e) {
						}
					}
					logger.info("Entity edge scope fetcher terminated after shutdown for {}", collectionName);
				}).subscribe(edges -> {

					logger.info("For collection {}", collectionName);
					Integer batchId = batch.getAndIncrement();
					logger.info("Started fetching details for collection {} batch {} ", collectionName, batchId);
					Observable.just(edges).subscribeOn(Schedulers.from(edgeScopeFetcher)).subscribe(edgeScopes -> {

						List<UUID> entityIds = new ArrayList<UUID>(1000);

						for (EdgeScope edgeScope : edgeScopes) {
							Id entityId = edgeScope.getEdge().getTargetNode();
							if (entityId != null) {
								entityIds.add(entityId.getUuid());
							} else {
								edgeScopes.remove(edgeScope);
							}
						}
						try {
							String type = edgeScopes.get(0).getEdge().getTargetNode().getType();

							Observable.just(entityIds).subscribeOn(Schedulers.from(entityFetcher)) // change to
									.subscribe(entIds -> {

										logger.info("Fetched {} entity id's of type {} for batch ID {}", entIds.size(),
												type, batchId);
										Results entities = rootEm.getEntities(entIds, type);
										logger.info("Fetched {} entities of type {} for batch ID {}", entities.size(),
												type, batchId);
										try {

											ConnectableObservable<Entity> entityObs = Observable
													.from(entities.getEntities()).publish();
											entityObs.subscribeOn(Schedulers.from(uniqueValueChecker));
											entityObs.subscribe(t -> {
												logger.info("Fetched entity with UUID : {}", t.getUuid());
												if (findMissingUniqueValues) {
													String fieldValue = null;
													//We can search entity with UUID or name/email based on the entity type. 
													//This mapping between unique value field(name/email etc) and UUID,
													//is stored in unique value table. This can either be name / email or any other type.
													//This value is being passed as field type. 
										            //The code below takes the parameter and retrieves the value of the field using the getter method. 
													if (fieldType == null || fieldType.equals("")
															|| fieldType.equals("name")) {
														fieldType = "name";
														fieldValue = t.getName();
													} else {
														try {
															Method method = t.getClass()
																	.getMethod("get"
																			+ fieldType.substring(0, 1).toUpperCase()
																			+ fieldType.substring(1));
															fieldValue = (String) method.invoke(t);
														} catch (Exception e1) {
															logger.error(
																	"Exception while trying to fetch field value of type {} for entity {} batch {}",
																	fieldType, t.getUuid(), batchId, e1);
														}
													}
													try {
														if (fieldValue != null) {

															Entity e = rootEm.getUniqueEntityFromAlias(t.getType(),
																	fieldValue, false);

															if (e == null) {
																logger.info(
																		"No entity found for field type {} and field value {} but exists for UUID {}",
																		fieldType, fieldValue, t.getUuid());
																if (fixMissingValue) {
																	logger.info(
																			"Trying to repair unique value mapping for {} ",
																			t.getUuid());
																	UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy
																			.load(new ApplicationScopeImpl(new SimpleId(
																					applicationId, "application")),
																					ConsistencyLevel
																							.valueOf(System.getProperty(
																									"usergrid.read.cl",
																									"LOCAL_QUORUM")),
																					t.getType(),
																					Collections.singletonList(
																							new StringField(fieldType,
																									fieldValue)),
																					false);

																	ApplicationScope applicationScope = new ApplicationScopeImpl(
																			new SimpleId(applicationId, "application"));
																	com.google.common.base.Optional<MvccEntity> entity = mvccEntitySerializationStrategy
																			.load(applicationScope, new SimpleId(
																					t.getUuid(), t.getType()));

																	if (!entity.isPresent()
																			|| !entity.get().getEntity().isPresent()) {
																		throw new RuntimeException(
																				"Unable to update unique value index because supplied UUID "
																						+ t.getUuid()
																						+ " does not exist");
																	}
																	logger.info("Delete unique value: {}",
																			uniqueValueSet.getValue(fieldType));
																	try {
																		session.execute(uniqueValueSerializationStrategy
																				.deleteCQL(applicationScope,
																						uniqueValueSet
																								.getValue(fieldType)));
																	} catch (Exception ex) {
																		logger.error(
																				"Exception while trying to delete the Unique value for {}. Will proceed with creating new entry",
																				t.getUuid(), ex);
																	}

																	UniqueValue newUniqueValue = new UniqueValueImpl(
																			new StringField(fieldType, fieldValue),
																			entity.get().getId(),
																			entity.get().getVersion());
																	logger.info("Writing new unique value: {}",
																			newUniqueValue);
																	session.execute(uniqueValueSerializationStrategy
																			.writeCQL(applicationScope, newUniqueValue,
																					-1));
																}

															} else {
																logger.info(
																		"Found entity {} for field type {} and field value {}",
																		e.getUuid(), fieldType, fieldValue);
															}
														} else {
															logger.info("No value found for field {} for entity {}",
																	fieldType, t.getUuid());
														}
													} catch (Exception e) {
														logger.error(
																"Error while checking unique values for batch id : {} for entity {}",
																batchId, t.getUuid(), e);
													}
												}
											});
											entityObs.connect();

										} catch (Exception e) {
											logger.error(
													"Error while checking unique values for batch id : {} for collection {}",
													batchId, collectionName, e);
										}
									});

						} catch (Exception e) {
							logger.error("Exception while traversing entities " + edgeScopes.get(0).getEdge(), e);
							System.exit(0);
						}
					});
					logger.info("Finished entity walk for collection {} for batch {}", collectionName, batchId);
				});
		logger.info("Exiting extractEntitiesForCollection() method.");
	}