private UniqueValueSet loadCQL()

in stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java [253:465]


    private UniqueValueSet loadCQL( final ApplicationScope appScope,
                                    final ConsistencyLevel consistencyLevel,
                                    final String type, final Collection<Field> fields, boolean useReadRepair ) {

        Preconditions.checkNotNull( fields, "fields are required" );
        Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" );


        final Id applicationId = appScope.getApplication();

        // row key = app UUID + app type + entityType + field type + field name + field value




        //List<ByteBuffer> partitionKeys = new ArrayList<>( fields.size() );

        final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );


        for ( Field field : fields ) {

            //log.info(Bytes.toHexString(getPartitionKey(applicationId, type,
            // field.getTypeName().toString(), field.getName(), field.getValue())));

            //partitionKeys.add(getPartitionKey(applicationId, type,
            // field.getTypeName().toString(), field.getName(), field.getValue()));

            final Clause inKey = QueryBuilder.in("key", getPartitionKey(applicationId, type,
                field.getTypeName().toString(), field.getName(), field.getValue()) );

            final Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES)
                .where(inKey)
                .setConsistencyLevel(consistencyLevel);

            final ResultSet resultSet = session.execute(statement);


            Iterator<com.datastax.driver.core.Row> results = resultSet.iterator();

            if( !results.hasNext()){
                if(logger.isTraceEnabled()){
                    logger.trace("No rows returned for unique value lookup of field: {}", field);
                }
            }


            List<UniqueValue> candidates = new ArrayList<>();

            while( results.hasNext() ){

                final com.datastax.driver.core.Row unique = results.next();
                ByteBuffer partitionKey = unique.getBytes("key");
                ByteBuffer column = unique.getBytesUnsafe("column1");

                List<Object> keyContents = deserializePartitionKey(partitionKey);
                List<Object> columnContents = deserializeUniqueValueColumn(column);

                FieldTypeName fieldType;
                String name;
                String value;
                if(this instanceof UniqueValueSerializationStrategyV2Impl) {


                    fieldType = FieldTypeName.valueOf((String) keyContents.get(3));
                    name = (String) keyContents.get(4);
                    value = (String) keyContents.get(5);

                }else{


                    fieldType = FieldTypeName.valueOf((String) keyContents.get(5));
                    name = (String) keyContents.get(6);
                    value = (String) keyContents.get(7);


                }

                Field returnedField = getField(name, value, fieldType);


                final EntityVersion entityVersion = new EntityVersion(
                    new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0));
//            //sanity check, nothing to do, skip it
//            if ( !columnList.hasNext() ) {
//                if(logger.isTraceEnabled()){
//                    logger.trace("No cells exist in partition for unique value [{}={}]",
//                        field.getName(), field.getValue().toString());
//                }
//                continue;
//            }




                /**
                 *  While iterating the rows, a rule is enforced to only EVER return the oldest UUID for the field.
                 *  This means the UUID with the oldest timestamp ( it was the original entity written for
                 *  the unique value ).
                 *
                 *  We do this to prevent cycling of unique value -> entity UUID mappings as this data is ordered by the
                 *  entity's version and not the entity's timestamp itself.
                 *
                 *  If newer entity UUIDs are encountered, they are removed from the unique value tables, however their
                 *  backing serialized entity data is left in tact in case a cleanup / audit is later needed.
                 */


                final UniqueValue uniqueValue =
                    new UniqueValueImpl(returnedField, entityVersion.getEntityId(), entityVersion.getEntityVersion());

                // set the initial candidate and move on
                if (candidates.size() == 0) {
                    candidates.add(uniqueValue);

                    if (logger.isTraceEnabled()) {
                        logger.trace("First entry for unique value [{}={}] found for application [{}], adding " +
                                "entry with entity id [{}] and entity version [{}] to the candidate list and continuing",
                            returnedField.getName(), returnedField.getValue().toString(), applicationId.getType(),
                            uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
                    }

                    continue;
                }

                if(!useReadRepair){

                    // take only the first
                    if (logger.isTraceEnabled()) {
                        logger.trace("Read repair not enabled for this request of unique value [{}={}], breaking out" +
                            " of cell loop", returnedField.getName(), returnedField.getValue().toString());
                    }
                    break;

                } else {


                    final int result = uniqueValueComparator.compare(uniqueValue, candidates.get(candidates.size() - 1));

                    if (result == 0) {

                        // do nothing, only versions can be newer and we're not worried about newer versions of same entity
                        if (logger.isTraceEnabled()) {
                            logger.trace("Current unique value [{}={}] entry has UUID [{}] equal to candidate UUID [{}]",
                                returnedField.getName(), returnedField.getValue().toString(), uniqueValue.getEntityId().getUuid(),
                                candidates.get(candidates.size() -1));
                        }

                        // update candidate w/ latest version
                        candidates.add(uniqueValue);

                    } else if (result < 0) {

                        // delete the duplicate from the unique value index
                        candidates.forEach(candidate -> {

                            logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer " +
                                    "entry with entity id [{}] and entity version [{}]", returnedField.getName(),
                                returnedField.getValue().toString(), applicationId.getUuid(),
                                candidate.getEntityId().getUuid(), candidate.getEntityVersion());

                            session.execute(deleteCQL(appScope, candidate));

                        });

                        // clear the transient candidates list
                        candidates.clear();

                        if (logger.isTraceEnabled()) {
                            logger.trace("Updating candidate unique value [{}={}] to entity id [{}] and " +
                                    "entity version [{}]", returnedField.getName(), returnedField.getValue().toString(),
                                uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());

                        }

                        // add our new candidate to the list
                        candidates.add(uniqueValue);


                    } else {

                        logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer entry " +
                                "with entity id [{}] and entity version [{}].", returnedField.getName(), returnedField.getValue().toString(),
                            applicationId.getUuid(), uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());

                        // delete the duplicate from the unique value index
                        session.execute(deleteCQL(appScope, uniqueValue));


                    }

                }

            }

            if ( candidates.size() > 0 ) {
                // take the last candidate ( should be the latest version) and add to the result set
                final UniqueValue returnValue = candidates.get(candidates.size() - 1);
                if (logger.isTraceEnabled()) {
                    logger.trace("Adding unique value [{}={}] with entity id [{}] and entity version [{}] to response set",
                        returnValue.getField().getName(), returnValue.getField().getValue().toString(),
                        returnValue.getEntityId().getUuid(), returnValue.getEntityVersion());
                }
                uniqueValueSet.addValue(returnValue);
            }


        }


        return uniqueValueSet;

    }