in stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java [56:213]
public void onReceive(Object message) {
if ( message instanceof Request ) {
Request req = (Request) message;
count++;
if (count % 10 == 0 && logger.isDebugEnabled()) {
logger.debug( "UniqueValueActor {} processed {} requests", name, count );
}
}
if ( message instanceof Reservation ) {
Reservation res = (Reservation) message;
// final Timer.Context context = metricsService.getReservationTimer().time();
try {
Id owner = table.lookupOwner( res.getApplicationScope(), res.getOwner().getType(), res.getField() );
if ( owner != null && owner.equals( res.getOwner() )) {
// sender already owns this unique value
getSender().tell( new Response( Response.Status.IS_UNIQUE, res.getConsistentHashKey() ),
getSender() );
return;
} else if ( owner != null && !owner.equals( res.getOwner() )) {
// tell sender value is not unique
getSender().tell( new Response( Response.Status.NOT_UNIQUE, res.getConsistentHashKey() ),
getSender() );
return;
}
table.reserve( res.getApplicationScope(), res.getOwner(), res.getOwnerVersion(), res.getField() );
getSender().tell( new Response( Response.Status.IS_UNIQUE, res.getConsistentHashKey() ),
getSender() );
if(uniqueValuesFig.getSkipRemoteRegions()) {
actorSystemManager.publishToLocalRegion( "content", new Reservation( res ), getSelf() );
} else {
actorSystemManager.publishToAllRegions( "content", new Reservation( res ), getSelf() );
}
} catch (Throwable t) {
getSender().tell( new Response( Response.Status.ERROR, res.getConsistentHashKey() ), getSender() );
logger.error( "Error processing request", t );
} finally {
// context.stop();
}
} else if ( message instanceof Confirmation) {
Confirmation con = (Confirmation) message;
// final Timer.Context context = metricsService.getCommitmentTimer().time();
try {
Id owner = table.lookupOwner( con.getApplicationScope(), con.getOwner().getType(), con.getField() );
if ( owner != null && !owner.equals( con.getOwner() )) {
// cannot reserve, somebody else owns the unique value
Response response = new Response( Response.Status.NOT_UNIQUE, con.getConsistentHashKey());
getSender().tell( response, getSender() );
if(uniqueValuesFig.getSkipRemoteRegions()) {
actorSystemManager.publishToLocalRegion( "content", response, getSelf() );
} else {
actorSystemManager.publishToAllRegions( "content", response, getSelf() );
}
return;
} else if ( owner == null ) {
// cannot commit without first reserving
Response response = new Response( Response.Status.BAD_REQUEST, con.getConsistentHashKey());
getSender().tell( response, getSender() );
if(uniqueValuesFig.getSkipRemoteRegions()) {
actorSystemManager.publishToLocalRegion( "content", response, getSelf() );
} else {
actorSystemManager.publishToAllRegions( "content", response, getSelf() );
}
return;
}
table.confirm( con.getApplicationScope(), con.getOwner(), con.getOwnerVersion(), con.getField() );
Response response = new Response( Response.Status.IS_UNIQUE, con.getConsistentHashKey() );
getSender().tell( response, getSender() );
if(uniqueValuesFig.getSkipRemoteRegions()) {
actorSystemManager.publishToLocalRegion( "content", response, getSelf() );
} else {
actorSystemManager.publishToAllRegions( "content", response, getSelf() );
}
} catch (Throwable t) {
getSender().tell( new Response( Response.Status.ERROR, con.getConsistentHashKey() ),
getSender() );
logger.error( "Error processing request", t );
} finally {
// context.stop();
}
} else if ( message instanceof Cancellation ) {
Cancellation can = (Cancellation) message;
try {
Id owner = table.lookupOwner( can.getApplicationScope(), can.getOwner().getType(), can.getField() );
if ( owner != null && !owner.equals( can.getOwner() )) {
// cannot cancel, somebody else owns the unique value
getSender().tell( new Response( Response.Status.NOT_UNIQUE, can.getConsistentHashKey() ),
getSender() );
return;
} else if ( owner == null ) {
// cannot cancel unique value that does not exist
getSender().tell( new Response( Response.Status.BAD_REQUEST, can.getConsistentHashKey() ),
getSender() );
// unique value record may have already been cleaned up, also clear cache
if(uniqueValuesFig.getSkipRemoteRegions()) {
actorSystemManager.publishToLocalRegion( "content", new Cancellation( can ), getSelf() );
} else {
actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() );
}
return;
}
table.cancel( can.getApplicationScope(), can.getOwner(), can.getOwnerVersion(), can.getField() );
logger.debug("Removing {} from unique values table", can.getConsistentHashKey());
getSender().tell( new Response( Response.Status.SUCCESS, can.getConsistentHashKey() ),
getSender() );
if(uniqueValuesFig.getSkipRemoteRegions()) {
actorSystemManager.publishToLocalRegion( "content", new Cancellation( can ), getSelf() );
} else {
actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() );
}
} catch (Throwable t) {
getSender().tell( new Response( Response.Status.ERROR, can.getConsistentHashKey() ),
getSender() );
logger.error( "Error processing request", t );
}
} else {
unhandled( message );
}
}