in stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java [126:297]
public void runTool( CommandLine line ) throws Exception {
startSpring();
UUID appToFilter = null;
if (!line.getOptionValue(APPLICATION_ARG).isEmpty()) {
appToFilter = UUID.fromString(line.getOptionValue(APPLICATION_ARG));
}
logger.info("Staring Tool: UniqueValueScanner");
logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"));
keyspace = injector.getInstance(com.netflix.astyanax.Keyspace.class);
mvccEntitySerializationStrategy = injector.getInstance(MvccEntitySerializationStrategy.class);
uniqueValueSerializationStrategy = injector.getInstance(UniqueValueSerializationStrategy.class);
String fieldType =
line.getOptionValue(ENTITY_FIELD_TYPE_ARG) != null ? line.getOptionValue(ENTITY_FIELD_TYPE_ARG) : "name" ;
String entityType = line.getOptionValue(ENTITY_TYPE_ARG);
String entityName = line.getOptionValue(ENTITY_NAME_ARG);
AtomicInteger count = new AtomicInteger(0);
if (entityName != null && !entityName.isEmpty()) {
if(appToFilter == null){
throw new RuntimeException("Cannot execute UniqueValueScanner with specific entity without the " +
"application UUID for which the entity should exist.");
}
if(entityType == null){
throw new RuntimeException("Cannot execute UniqueValueScanner without the entity type (singular " +
"collection name).");
}
logger.info("Running entity unique load only");
//do stuff w/o read repair
UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy.load(
new ApplicationScopeImpl( new SimpleId(appToFilter, "application" ) ),
ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "LOCAL_QUORUM")), entityType,
Collections.singletonList(new StringField( fieldType, entityName) ), false);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("[");
uniqueValueSet.forEach( uniqueValue -> {
String entry = "fieldName="+uniqueValue.getField().getName()+
", fieldValue="+uniqueValue.getField().getValue()+
", uuid="+uniqueValue.getEntityId().getUuid()+
", type="+uniqueValue.getEntityId().getType()+
", version="+uniqueValue.getEntityVersion();
stringBuilder.append("{").append(entry).append("},");
});
stringBuilder.deleteCharAt(stringBuilder.length() -1);
stringBuilder.append("]");
logger.info("Returned unique value set from serialization load = {}", stringBuilder.toString());
} else {
logger.info("Running entity unique scanner only");
// scan through all unique values and log some info
Iterator<com.netflix.astyanax.model.Row<ScopedRowKey<TypeField>, EntityVersion>> rows = null;
try {
rows = keyspace.prepareQuery(CF_UNIQUE_VALUES)
.setConsistencyLevel(com.netflix.astyanax.model.ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")))
.getAllRows()
.withColumnRange(new RangeBuilder().setLimit(1000).build())
.execute().getResult().iterator();
} catch (ConnectionException e) {
logger.error("Error connecting to cassandra", e);
}
UUID finalAppToFilter = appToFilter;
if( rows != null) {
rows.forEachRemaining(row -> {
count.incrementAndGet();
if(count.get() % 1000 == 0 ){
logger.info("Scanned {} rows in {}", count.get(), CF_UNIQUE_VALUES.getName());
}
final String fieldName = row.getKey().getKey().getField().getName();
final String fieldValue = row.getKey().getKey().getField().getValue().toString();
final String scopeType = row.getKey().getScope().getType();
final UUID scopeUUID = row.getKey().getScope().getUuid();
if (!fieldName.equalsIgnoreCase(fieldType) ||
(finalAppToFilter != null && !finalAppToFilter.equals(scopeUUID))
) {
// do nothing
} else {
// if we have more than 1 column, let's check for a duplicate
if (row.getColumns() != null && row.getColumns().size() > 1) {
final List<EntityVersion> values = new ArrayList<>(row.getColumns().size());
Iterator<Column<EntityVersion>> columns = row.getColumns().iterator();
columns.forEachRemaining(column -> {
final EntityVersion entityVersion = column.getName();
logger.trace(
scopeType + ": " + scopeUUID + ", " +
fieldName + ": " + fieldValue + ", " +
"entity type: " + entityVersion.getEntityId().getType() + ", " +
"entity uuid: " + entityVersion.getEntityId().getUuid()
);
if (entityType != null &&
entityVersion.getEntityId().getType().equalsIgnoreCase(entityType)
) {
// add the first value into the list
if (values.size() == 0) {
values.add(entityVersion);
} else {
if (!values.get(0).getEntityId().getUuid().equals(entityVersion.getEntityId().getUuid())) {
values.add(entityVersion);
logger.error("Duplicate found for field [{}={}]. Entry 1: [{}], Entry 2: [{}]",
fieldName, fieldValue, values.get(0).getEntityId(), entityVersion.getEntityId());
}
}
}
});
}
}
});
}else{
logger.warn("No rows returned from table: {}", CF_UNIQUE_VALUES.getName());
}
}
}