in stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java [129:370]
public CompactionResult compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
final ShardEntryGroup group ) {
final long startTime = timeService.getCurrentTime();
Preconditions.checkNotNull( group, "group cannot be null" );
Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" );
Preconditions
.checkArgument( group.shouldCompact( startTime ), "Compaction cannot be run yet. Ignoring compaction." );
if(logger.isTraceEnabled()) {
logger.trace("Compacting shard group. Audit count is {} ", countAudits.get());
}
final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder();
final Shard targetShard = group.getCompactionTarget();
final Set<Shard> sourceShards = new HashSet<>( group.getReadShards() );
//remove the target
sourceShards.remove( targetShard );
final UUID timestamp = UUIDGenerator.newTimeUUID();
final long newShardPivot = targetShard.getShardIndex();
final int maxWorkSize = graphFig.getScanPageSize();
/**
* As we move edges, we want to keep track of it
*/
long totalEdgeCount = 0;
for ( Shard sourceShard : sourceShards ) {
final MutationBatch newRowBatch = keyspace.prepareMutationBatch();
final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch();
final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch();
long edgeCount = 0;
Iterator<MarkedEdge> edges = edgeMeta
.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singleton( sourceShard ),
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING );
MarkedEdge shardEnd = null;
while ( edges.hasNext() ) {
final MarkedEdge edge = edges.next();
final long edgeTimestamp = edge.getTimestamp();
shardEnd = edge;
/**
* The edge is within a different shard, break
*/
if ( edgeTimestamp < newShardPivot ) {
break;
}
newRowBatch.mergeShallow( edgeMeta
.writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge,
timestamp ) );
deleteRowBatch.mergeShallow( edgeMeta
.deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge,
timestamp ) );
edgeCount++;
// if we're at our count, execute the mutation of writing the edges to the new row, then remove them
// from the old rows
if ( edgeCount % maxWorkSize == 0 ) {
try {
// write the edges into the new shard atomically so we know they all succeed
newRowBatch.withAtomicBatch(true).execute();
// Update the shard end after each batch so any reads during transition stay as close to current
sourceShard.setShardEnd(
Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
);
if(logger.isTraceEnabled()) {
logger.trace("Updating shard {} during batch removal with shardEnd {}", sourceShard, shardEnd);
}
updateShardMetaBatch.mergeShallow(
edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
// on purpose block this thread before deleting the old edges to be sure there are no gaps
// duplicates are filtered on graph seeking so this is OK
Thread.sleep(1000);
if(logger.isTraceEnabled()) {
logger.trace("Deleting batch of {} from old shard", maxWorkSize);
}
deleteRowBatch.withAtomicBatch(true).execute();
updateShardMetaBatch.execute();
}
catch ( Throwable t ) {
logger.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
}
totalEdgeCount += edgeCount;
edgeCount = 0;
}
}
totalEdgeCount += edgeCount;
try {
// write the edges into the new shard atomically so we know they all succeed
newRowBatch.withAtomicBatch(true).execute();
// on purpose block this thread before deleting the old edges to be sure there are no gaps
// duplicates are filtered on graph seeking so this is OK
Thread.sleep(1000);
if(logger.isTraceEnabled()) {
logger.trace("Deleting remaining {} edges from old shard", edgeCount);
}
deleteRowBatch.withAtomicBatch(true).execute();
if (shardEnd != null){
sourceShard.setShardEnd(
Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
);
if(logger.isTraceEnabled()) {
logger.trace("Updating for last time shard {} with shardEnd {}", sourceShard, shardEnd);
}
updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
updateShardMetaBatch.execute();
}
}
catch ( Throwable t ) {
logger.error( "Unable to move edges to target shard {}", targetShard );
}
}
if (logger.isTraceEnabled()) {
logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount);
}
resultBuilder.withCopiedEdges( totalEdgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
/**
* We didn't move anything this pass, mark the shard as compacted. If we move something,
* it means that we missed it on the first pass
* or someone is still not writing to the target shard only.
*/
if ( totalEdgeCount == 0 ) {
// now that we've marked our target as compacted, we can successfully remove any shards that are not
// compacted themselves in the sources
final MutationBatch shardRemovalRollup = keyspace.prepareMutationBatch();
for ( Shard source : sourceShards ) {
//if we can't safely delete it, don't do so
if ( !group.canBeDeleted( source ) ) {
continue;
}
logger.info( "Source shards have been fully drained. Removing shard {}", source );
final MutationBatch shardRemoval = edgeShardSerialization.removeShardMeta( scope, source, edgeMeta );
shardRemovalRollup.mergeShallow( shardRemoval );
resultBuilder.withRemovedShard( source );
}
try {
shardRemovalRollup.execute();
// invalidate the shard cache so we can be sure that all read shards are up to date
nodeShardCache.invalidate(scope, edgeMeta);
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
//Overwrite our shard index with a newly created one that has been marked as compacted
Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
compactedShard.setShardEnd(Optional.absent());
if(logger.isTraceEnabled()) {
logger.trace("Shard has been fully compacted. Marking shard {} as compacted in Cassandra", compactedShard);
}
final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
try {
updateMark.execute();
// invalidate the shard cache so we can be sure that all read shards are up to date
nodeShardCache.invalidate(scope, edgeMeta);
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
resultBuilder.withCompactedShard( compactedShard );
}
return resultBuilder.build();
}