in stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java [253:465]
private UniqueValueSet loadCQL( final ApplicationScope appScope,
final ConsistencyLevel consistencyLevel,
final String type, final Collection<Field> fields, boolean useReadRepair ) {
Preconditions.checkNotNull( fields, "fields are required" );
Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" );
final Id applicationId = appScope.getApplication();
// row key = app UUID + app type + entityType + field type + field name + field value
//List<ByteBuffer> partitionKeys = new ArrayList<>( fields.size() );
final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
for ( Field field : fields ) {
//log.info(Bytes.toHexString(getPartitionKey(applicationId, type,
// field.getTypeName().toString(), field.getName(), field.getValue())));
//partitionKeys.add(getPartitionKey(applicationId, type,
// field.getTypeName().toString(), field.getName(), field.getValue()));
final Clause inKey = QueryBuilder.in("key", getPartitionKey(applicationId, type,
field.getTypeName().toString(), field.getName(), field.getValue()) );
final Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES)
.where(inKey)
.setConsistencyLevel(consistencyLevel);
final ResultSet resultSet = session.execute(statement);
Iterator<com.datastax.driver.core.Row> results = resultSet.iterator();
if( !results.hasNext()){
if(logger.isTraceEnabled()){
logger.trace("No rows returned for unique value lookup of field: {}", field);
}
}
List<UniqueValue> candidates = new ArrayList<>();
while( results.hasNext() ){
final com.datastax.driver.core.Row unique = results.next();
ByteBuffer partitionKey = unique.getBytes("key");
ByteBuffer column = unique.getBytesUnsafe("column1");
List<Object> keyContents = deserializePartitionKey(partitionKey);
List<Object> columnContents = deserializeUniqueValueColumn(column);
FieldTypeName fieldType;
String name;
String value;
if(this instanceof UniqueValueSerializationStrategyV2Impl) {
fieldType = FieldTypeName.valueOf((String) keyContents.get(3));
name = (String) keyContents.get(4);
value = (String) keyContents.get(5);
}else{
fieldType = FieldTypeName.valueOf((String) keyContents.get(5));
name = (String) keyContents.get(6);
value = (String) keyContents.get(7);
}
Field returnedField = getField(name, value, fieldType);
final EntityVersion entityVersion = new EntityVersion(
new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0));
// //sanity check, nothing to do, skip it
// if ( !columnList.hasNext() ) {
// if(logger.isTraceEnabled()){
// logger.trace("No cells exist in partition for unique value [{}={}]",
// field.getName(), field.getValue().toString());
// }
// continue;
// }
/**
* While iterating the rows, a rule is enforced to only EVER return the oldest UUID for the field.
* This means the UUID with the oldest timestamp ( it was the original entity written for
* the unique value ).
*
* We do this to prevent cycling of unique value -> entity UUID mappings as this data is ordered by the
* entity's version and not the entity's timestamp itself.
*
* If newer entity UUIDs are encountered, they are removed from the unique value tables, however their
* backing serialized entity data is left in tact in case a cleanup / audit is later needed.
*/
final UniqueValue uniqueValue =
new UniqueValueImpl(returnedField, entityVersion.getEntityId(), entityVersion.getEntityVersion());
// set the initial candidate and move on
if (candidates.size() == 0) {
candidates.add(uniqueValue);
if (logger.isTraceEnabled()) {
logger.trace("First entry for unique value [{}={}] found for application [{}], adding " +
"entry with entity id [{}] and entity version [{}] to the candidate list and continuing",
returnedField.getName(), returnedField.getValue().toString(), applicationId.getType(),
uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
}
continue;
}
if(!useReadRepair){
// take only the first
if (logger.isTraceEnabled()) {
logger.trace("Read repair not enabled for this request of unique value [{}={}], breaking out" +
" of cell loop", returnedField.getName(), returnedField.getValue().toString());
}
break;
} else {
final int result = uniqueValueComparator.compare(uniqueValue, candidates.get(candidates.size() - 1));
if (result == 0) {
// do nothing, only versions can be newer and we're not worried about newer versions of same entity
if (logger.isTraceEnabled()) {
logger.trace("Current unique value [{}={}] entry has UUID [{}] equal to candidate UUID [{}]",
returnedField.getName(), returnedField.getValue().toString(), uniqueValue.getEntityId().getUuid(),
candidates.get(candidates.size() -1));
}
// update candidate w/ latest version
candidates.add(uniqueValue);
} else if (result < 0) {
// delete the duplicate from the unique value index
candidates.forEach(candidate -> {
logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer " +
"entry with entity id [{}] and entity version [{}]", returnedField.getName(),
returnedField.getValue().toString(), applicationId.getUuid(),
candidate.getEntityId().getUuid(), candidate.getEntityVersion());
session.execute(deleteCQL(appScope, candidate));
});
// clear the transient candidates list
candidates.clear();
if (logger.isTraceEnabled()) {
logger.trace("Updating candidate unique value [{}={}] to entity id [{}] and " +
"entity version [{}]", returnedField.getName(), returnedField.getValue().toString(),
uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
}
// add our new candidate to the list
candidates.add(uniqueValue);
} else {
logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer entry " +
"with entity id [{}] and entity version [{}].", returnedField.getName(), returnedField.getValue().toString(),
applicationId.getUuid(), uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
// delete the duplicate from the unique value index
session.execute(deleteCQL(appScope, uniqueValue));
}
}
}
if ( candidates.size() > 0 ) {
// take the last candidate ( should be the latest version) and add to the result set
final UniqueValue returnValue = candidates.get(candidates.size() - 1);
if (logger.isTraceEnabled()) {
logger.trace("Adding unique value [{}={}] with entity id [{}] and entity version [{}] to response set",
returnValue.getField().getName(), returnValue.getField().getValue().toString(),
returnValue.getEntityId().getUuid(), returnValue.getEntityVersion());
}
uniqueValueSet.addValue(returnValue);
}
}
return uniqueValueSet;
}