public void onReceive()

in stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java [56:213]


    public void onReceive(Object message) {

        if ( message instanceof Request ) {
            Request req = (Request) message;

            count++;
            if (count % 10 == 0 && logger.isDebugEnabled()) {
                logger.debug( "UniqueValueActor {} processed {} requests", name, count );
            }
        }

        if ( message instanceof Reservation ) {
            Reservation res = (Reservation) message;

            // final Timer.Context context = metricsService.getReservationTimer().time();

            try {
                Id owner = table.lookupOwner( res.getApplicationScope(), res.getOwner().getType(), res.getField() );

                if ( owner != null && owner.equals( res.getOwner() )) {
                    // sender already owns this unique value
                    getSender().tell( new Response( Response.Status.IS_UNIQUE, res.getConsistentHashKey() ),
                        getSender() );
                    return;

                } else if ( owner != null && !owner.equals( res.getOwner() )) {
                    // tell sender value is not unique
                    getSender().tell( new Response( Response.Status.NOT_UNIQUE, res.getConsistentHashKey() ),
                        getSender() );
                    return;
                }

                table.reserve( res.getApplicationScope(), res.getOwner(), res.getOwnerVersion(), res.getField() );

                getSender().tell( new Response( Response.Status.IS_UNIQUE, res.getConsistentHashKey() ),
                    getSender() );
                
                if(uniqueValuesFig.getSkipRemoteRegions()) {
                	actorSystemManager.publishToLocalRegion( "content", new Reservation( res ), getSelf() );
                } else {
                	actorSystemManager.publishToAllRegions( "content", new Reservation( res ), getSelf() );
                }

            } catch (Throwable t) {

                getSender().tell( new Response( Response.Status.ERROR, res.getConsistentHashKey() ), getSender() );
                logger.error( "Error processing request", t );


            } finally {
                // context.stop();
            }

        } else if ( message instanceof Confirmation) {
            Confirmation con = (Confirmation) message;

            // final Timer.Context context = metricsService.getCommitmentTimer().time();

            try {
                Id owner = table.lookupOwner( con.getApplicationScope(), con.getOwner().getType(), con.getField() );

                if ( owner != null && !owner.equals( con.getOwner() )) {
                    // cannot reserve, somebody else owns the unique value
                    Response response  = new Response( Response.Status.NOT_UNIQUE, con.getConsistentHashKey());
                    getSender().tell( response, getSender() );
                    if(uniqueValuesFig.getSkipRemoteRegions()) {
                    	actorSystemManager.publishToLocalRegion( "content", response, getSelf() );
                    } else {
                    	actorSystemManager.publishToAllRegions( "content", response, getSelf() );
                    }
                    return;

                } else if ( owner == null ) {
                    // cannot commit without first reserving
                    Response response  = new Response( Response.Status.BAD_REQUEST, con.getConsistentHashKey());
                    getSender().tell( response, getSender() );
                    
                    if(uniqueValuesFig.getSkipRemoteRegions()) {
                    	actorSystemManager.publishToLocalRegion( "content", response, getSelf() );
                    } else {
                    	actorSystemManager.publishToAllRegions( "content", response, getSelf() );
                    }
                    
                    return;
                }

                table.confirm( con.getApplicationScope(), con.getOwner(), con.getOwnerVersion(), con.getField() );

                Response response = new Response( Response.Status.IS_UNIQUE, con.getConsistentHashKey() );
                getSender().tell( response, getSender() );

                if(uniqueValuesFig.getSkipRemoteRegions()) {
                	actorSystemManager.publishToLocalRegion( "content", response, getSelf() );
                } else {
                	actorSystemManager.publishToAllRegions( "content", response, getSelf() );
                }

            } catch (Throwable t) {
                getSender().tell( new Response( Response.Status.ERROR, con.getConsistentHashKey() ),
                    getSender() );
                logger.error( "Error processing request", t );

            } finally {
                // context.stop();
            }


        } else if ( message instanceof Cancellation ) {
            Cancellation can = (Cancellation) message;

            try {
                Id owner = table.lookupOwner( can.getApplicationScope(), can.getOwner().getType(), can.getField() );

                if ( owner != null && !owner.equals( can.getOwner() )) {
                    // cannot cancel, somebody else owns the unique value
                    getSender().tell( new Response( Response.Status.NOT_UNIQUE, can.getConsistentHashKey() ),
                        getSender() );
                    return;

                } else if ( owner == null ) {

                    // cannot cancel unique value that does not exist
                    getSender().tell( new Response( Response.Status.BAD_REQUEST, can.getConsistentHashKey() ),
                        getSender() );

                    // unique value record may have already been cleaned up, also clear cache
                    if(uniqueValuesFig.getSkipRemoteRegions()) {
                    	actorSystemManager.publishToLocalRegion( "content", new Cancellation( can ), getSelf() );
                    } else {
                    	actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() );
                    }

                    return;
                }

                table.cancel( can.getApplicationScope(), can.getOwner(), can.getOwnerVersion(), can.getField() );

                logger.debug("Removing {} from unique values table", can.getConsistentHashKey());

                getSender().tell( new Response( Response.Status.SUCCESS, can.getConsistentHashKey() ),
                    getSender() );

                if(uniqueValuesFig.getSkipRemoteRegions()) {
                	actorSystemManager.publishToLocalRegion( "content", new Cancellation( can ), getSelf() );
                } else {
                	actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() );
                }

            } catch (Throwable t) {
                getSender().tell( new Response( Response.Status.ERROR, can.getConsistentHashKey() ),
                    getSender() );
                logger.error( "Error processing request", t );
            }

        } else {
            unhandled( message );
        }
    }