public CompactionResult compact()

in stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java [129:370]


    public CompactionResult compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
                                     final ShardEntryGroup group ) {


        final long startTime = timeService.getCurrentTime();


        Preconditions.checkNotNull( group, "group cannot be null" );
        Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" );
        Preconditions
            .checkArgument( group.shouldCompact( startTime ), "Compaction cannot be run yet.  Ignoring compaction." );

        if(logger.isTraceEnabled()) {
            logger.trace("Compacting shard group. Audit count is {} ", countAudits.get());
        }
        final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder();

        final Shard targetShard = group.getCompactionTarget();

        final Set<Shard> sourceShards = new HashSet<>( group.getReadShards() );

        //remove the target
        sourceShards.remove( targetShard );


        final UUID timestamp = UUIDGenerator.newTimeUUID();

        final long newShardPivot = targetShard.getShardIndex();

        final int maxWorkSize = graphFig.getScanPageSize();




        /**
         * As we move edges, we want to keep track of it
         */
        long totalEdgeCount = 0;


        for ( Shard sourceShard : sourceShards ) {

            final MutationBatch newRowBatch = keyspace.prepareMutationBatch();
            final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch();
            final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch();

            long edgeCount = 0;

            Iterator<MarkedEdge> edges = edgeMeta
                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singleton( sourceShard ),
                    Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING );

            MarkedEdge shardEnd = null;

            while ( edges.hasNext() ) {
                final MarkedEdge edge = edges.next();

                final long edgeTimestamp = edge.getTimestamp();

                shardEnd = edge;

                /**
                 * The edge is within a different shard, break
                 */
                if ( edgeTimestamp < newShardPivot ) {
                    break;
                }

                newRowBatch.mergeShallow( edgeMeta
                        .writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge,
                            timestamp ) );

                deleteRowBatch.mergeShallow( edgeMeta
                        .deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge,
                            timestamp ) );


                edgeCount++;



                // if we're at our count, execute the mutation of writing the edges to the new row, then remove them
                // from the old rows
                if ( edgeCount % maxWorkSize == 0 ) {



                    try {

                        // write the edges into the new shard atomically so we know they all succeed
                        newRowBatch.withAtomicBatch(true).execute();


                        // Update the shard end after each batch so any reads during transition stay as close to current
                        sourceShard.setShardEnd(
                            Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
                        );

                        if(logger.isTraceEnabled()) {
                            logger.trace("Updating shard {} during batch removal with shardEnd {}", sourceShard, shardEnd);
                        }
                        updateShardMetaBatch.mergeShallow(
                            edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));



                        // on purpose block this thread before deleting the old edges to be sure there are no gaps
                        // duplicates are filtered on graph seeking so this is OK
                        Thread.sleep(1000);

                        if(logger.isTraceEnabled()) {
                            logger.trace("Deleting batch of {} from old shard", maxWorkSize);
                        }
                        deleteRowBatch.withAtomicBatch(true).execute();

                        updateShardMetaBatch.execute();


                    }
                    catch ( Throwable t ) {
                        logger.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
                    }

                    totalEdgeCount += edgeCount;
                    edgeCount = 0;
                }



            }

            totalEdgeCount += edgeCount;

            try {

                // write the edges into the new shard atomically so we know they all succeed
                newRowBatch.withAtomicBatch(true).execute();

                // on purpose block this thread before deleting the old edges to be sure there are no gaps
                // duplicates are filtered on graph seeking so this is OK
                Thread.sleep(1000);

                if(logger.isTraceEnabled()) {
                    logger.trace("Deleting remaining {} edges from old shard", edgeCount);
                }
                deleteRowBatch.withAtomicBatch(true).execute();

                if (shardEnd != null){

                    sourceShard.setShardEnd(
                        Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
                    );

                    if(logger.isTraceEnabled()) {
                        logger.trace("Updating for last time shard {} with shardEnd {}", sourceShard, shardEnd);
                    }
                    updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
                    updateShardMetaBatch.execute();
                }


            }
            catch ( Throwable t ) {
                logger.error( "Unable to move edges to target shard {}", targetShard );
            }



        }




        if (logger.isTraceEnabled()) {
            logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount);
        }

        resultBuilder.withCopiedEdges( totalEdgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );

        /**
         * We didn't move anything this pass, mark the shard as compacted.  If we move something,
         * it means that we missed it on the first pass
         * or someone is still not writing to the target shard only.
         */
        if ( totalEdgeCount == 0 ) {


            // now that we've marked our target as compacted, we can successfully remove any shards that are not
            // compacted themselves in the sources

            final MutationBatch shardRemovalRollup = keyspace.prepareMutationBatch();

            for ( Shard source : sourceShards ) {

                //if we can't safely delete it, don't do so
                if ( !group.canBeDeleted( source ) ) {
                    continue;
                }

                logger.info( "Source shards have been fully drained.  Removing shard {}", source );

                final MutationBatch shardRemoval = edgeShardSerialization.removeShardMeta( scope, source, edgeMeta );
                shardRemovalRollup.mergeShallow( shardRemoval );

                resultBuilder.withRemovedShard( source );
            }


            try {
                shardRemovalRollup.execute();

                // invalidate the shard cache so we can be sure that all read shards are up to date
                nodeShardCache.invalidate(scope, edgeMeta);
            }
            catch ( ConnectionException e ) {
                throw new RuntimeException( "Unable to connect to cassandra", e );
            }

            //Overwrite our shard index with a newly created one that has been marked as compacted
            Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
            compactedShard.setShardEnd(Optional.absent());

            if(logger.isTraceEnabled()) {
                logger.trace("Shard has been fully compacted.  Marking shard {} as compacted in Cassandra", compactedShard);
            }

            final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
            try {
                updateMark.execute();

                // invalidate the shard cache so we can be sure that all read shards are up to date
                nodeShardCache.invalidate(scope, edgeMeta);
            }
            catch ( ConnectionException e ) {
                throw new RuntimeException( "Unable to connect to cassandra", e );
            }

            resultBuilder.withCompactedShard( compactedShard );
        }

        return resultBuilder.build();
    }