in stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java [123:242]
private DeleteResult maybeDeleteShardInternal( final ApplicationScope applicationScope,
final DirectedEdgeMeta directedEdgeMeta,
final ShardEntryGroup shardEntryGroup,
final Iterator<MarkedEdge> edgeIterator ) {
//Use ths to TEMPORARILY remove deletes from occurring
//return DeleteResult.NO_OP;
if (logger.isTraceEnabled()) logger.trace( "Beginning audit of shard group {}", shardEntryGroup );
/**
* Compaction is pending, we cannot check it
*/
if ( shardEntryGroup.isCompactionPending() ) {
if (logger.isTraceEnabled()) logger.trace( "Shard group {} is compacting, not auditing group", shardEntryGroup );
return DeleteResult.COMPACTION_PENDING;
}
if (logger.isTraceEnabled()) logger.trace( "Shard group {} has no compaction pending", shardEntryGroup );
final long currentTime = timeService.getCurrentTime();
if ( shardEntryGroup.isNew( currentTime ) ) {
if (logger.isTraceEnabled()) logger.trace( "Shard group {} contains a shard that is is too new, not auditing group", shardEntryGroup );
return DeleteResult.TOO_NEW;
}
if (logger.isTraceEnabled()) {
logger.trace("Shard group {} has passed the delta timeout at {}", shardEntryGroup, currentTime);
}
/**
* We have edges, and therefore cannot delete them
*/
if ( edgeIterator.hasNext() ) {
if (logger.isTraceEnabled()) {
logger.trace("Shard group {} has edges, not deleting", shardEntryGroup);
}
return DeleteResult.CONTAINS_EDGES;
}
if (logger.isTraceEnabled()) {
logger.trace("Shard group {} has no edges continuing to delete", shardEntryGroup, currentTime);
}
//now we can proceed based on the shard meta state and we don't have any edge
DeleteResult result = DeleteResult.NO_OP;
MutationBatch rollup = null;
for ( final Shard shard : shardEntryGroup.getReadShards() ) {
//skip the min shard
if(shard.isMinShard()){
if (logger.isTraceEnabled()) {
logger.trace("Shard {} in group {} is the minimum, not deleting", shard, shardEntryGroup);
}
continue;
}
//The shard is not compacted, we cannot remove it. This should never happen, a bit of an "oh shit" scenario.
//the isCompactionPending should return false in this case
if(!shard.isCompacted()){
logger.warn( "Shard {} in group {} is not compacted yet was checked. Short circuiting", shard, shardEntryGroup );
return DeleteResult.NO_OP;
}
if(shard.isDeleted()){
if(logger.isTraceEnabled()){
logger.trace("Shard {} already deleted. Short circuiting.", shard);
}
return DeleteResult.NO_OP;
}
shard.setDeleted(true);
final MutationBatch setShardDeletedFlagMutation =
edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta );
/* Previously the below was used for actually deleting the shard vs.a marking strategy with read filtering
final MutationBatch shardRemovalMutation =
edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta );
*/
if ( rollup == null ) {
rollup = setShardDeletedFlagMutation;
}
else {
rollup.mergeShallow( setShardDeletedFlagMutation );
}
result = DeleteResult.DELETED;
logger.info( "Marking shard {} as deleted in group {}", shard, shardEntryGroup );
}
if( rollup != null) {
try {
rollup.execute();
}
catch ( ConnectionException e ) {
logger.error( "Unable to execute shard deletion", e );
throw new RuntimeException( "Unable to execute shard deletion", e );
}
}
if (logger.isTraceEnabled()) {
logger.trace("Completed auditing shard group {}", shardEntryGroup);
}
return result;
}