public void handleNotification()

in src/org/apache/pig/impl/util/SpillableMemoryManager.java [193:357]


    public void handleNotification(Notification n, Object o) {
        CompositeData cd = (CompositeData) n.getUserData();
        MemoryNotificationInfo info = MemoryNotificationInfo.from(cd);
        // free the amount exceeded over the threshold and then a further half
        // so if threshold = heapmax/2, we will be trying to free
        // used - heapmax/2 + heapmax/4
        long toFree = 0L;
        if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
            toFree = info.getUsage().getUsed() - memoryThresholdSize + (long)(memoryThresholdSize * 0.5);

            //log
            String msg = "memory handler call- Usage threshold "
                + info.getUsage() + ", toFree = " + toFree;
            if(!firstUsageThreshExceededLogged){
                log.info("first " + msg);
                firstUsageThreshExceededLogged = true;
            }else{
                log.debug(msg);
            }
        } else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
            toFree = info.getUsage().getUsed() - collectionThresholdSize + (long)(collectionThresholdSize * 0.5);

            //log
            String msg = "memory handler call - Collection threshold "
                + info.getUsage() + ", toFree = " + toFree;
            if(!firstCollectionThreshExceededLogged){
                log.info("first " + msg);
                firstCollectionThreshExceededLogged = true;
            }else{
                log.debug(msg);
            }

        }
        if (toFree < 0) {
            log.debug("low memory handler returning " +
                "because there is nothing to free");
            return;
        }

        // Use a separate spillLock to block multiple handleNotification calls
        synchronized (spillLock) {
            synchronized(spillables) {
                spillablesSR = new LinkedList<SpillablePtr>();
                for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
                    Spillable s = i.next().get();
                    if (s == null) {
                        i.remove();
                        continue;
                    }
                    // Create a list with spillable size for stable sorting. Refer PIG-4012
                    spillablesSR.add(new SpillablePtr(s, s.getMemorySize()));
                }
                log.debug("Spillables list size: " + spillablesSR.size());
                Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
                    @Override
                    public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref) {
                        long o1Size = o1Ref.getMemorySize();
                        long o2Size = o2Ref.getMemorySize();

                        if (o1Size == o2Size) {
                            return 0;
                        }
                        if (o1Size < o2Size) {
                            return 1;
                        }
                        return -1;
                    }
                });
                // Block new bags from being registered
                blockRegisterOnSpill = true;
            }

            try {
                long estimatedFreed = 0;
                int numObjSpilled = 0;
                boolean invokeGC = false;
                boolean extraGCCalled = false;
                boolean isGroupingSpillable = false;
                for (Iterator<SpillablePtr> i = spillablesSR.iterator(); i.hasNext();) {
                    SpillablePtr sPtr = i.next();
                    Spillable s = sPtr.get();
                    // Still need to check for null here, even after we removed
                    // above, because the reference may have gone bad on us
                    // since the last check.
                    if (s == null) {
                        i.remove();
                        continue;
                    }
                    long toBeFreed = sPtr.getMemorySize();
                    log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
                    // Don't keep trying if the rest of files are too small
                    if (toBeFreed < spillFileSizeThreshold) {
                        log.debug("spilling small files - getting out of memory handler");
                        break ;
                    }
                    isGroupingSpillable = (s instanceof GroupingSpillable);
                    // If single Spillable is bigger than the threshold,
                    // we force GC to make sure we really need to keep this
                    // object before paying for the expensive spill().
                    // Done at most once per handleNotification.
                    // Do not invoke extraGC for GroupingSpillable. Its size will always exceed
                    // extraGCSpillSizeThreshold and the data is always strong referenced.
                    if( !extraGCCalled && extraGCSpillSizeThreshold != 0
                        && toBeFreed > extraGCSpillSizeThreshold  && !isGroupingSpillable
                        && n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
                        log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
                        // this extra assignment to null is needed so that gc can free the
                        // spillable if nothing else is pointing at it
                        s = null;
                        System.gc();
                        extraGCCalled = true;
                        // checking again to see if this reference is still valid
                        s = sPtr.get();
                        if (s == null) {
                            i.remove();
                            accumulatedFreeSize = 0;
                            invokeGC = false;
                            continue;
                        }
                    }
                    // Unblock registering of new bags temporarily as aggregation
                    // of POPartialAgg requires new record to be loaded.
                    blockRegisterOnSpill = !isGroupingSpillable;
                    long numSpilled;
                    try {
                        numSpilled = s.spill();
                    } finally {
                        blockRegisterOnSpill = true;
                    }

                    if (numSpilled > 0) {
                        numObjSpilled++;
                        estimatedFreed += toBeFreed;
                        accumulatedFreeSize += toBeFreed;
                    }
                    // This should significantly reduce the number of small files
                    // in case that we have a lot of nested bags
                    if (accumulatedFreeSize > gcActivationSize) {
                        invokeGC = true;
                    }

                    if (estimatedFreed > toFree) {
                        log.debug("Freed enough space - getting out of memory handler");
                        invokeGC = true;
                        break;
                    }
                }
                spillablesSR = null;
                /* Poke the GC again to see if we successfully freed enough memory */
                if(invokeGC) {
                    System.gc();
                    // now that we have invoked the GC, reset accumulatedFreeSize
                    accumulatedFreeSize = 0;
                }
                if(estimatedFreed > 0){
                    String msg = "Spilled an estimate of " + estimatedFreed +
                    " bytes from " + numObjSpilled + " objects. " + info.getUsage();;
                    log.info(msg);
                }
            } finally {
                blockRegisterOnSpill = false;
            }
        }

    }