in src/org/apache/pig/impl/util/SpillableMemoryManager.java [193:357]
public void handleNotification(Notification n, Object o) {
CompositeData cd = (CompositeData) n.getUserData();
MemoryNotificationInfo info = MemoryNotificationInfo.from(cd);
// free the amount exceeded over the threshold and then a further half
// so if threshold = heapmax/2, we will be trying to free
// used - heapmax/2 + heapmax/4
long toFree = 0L;
if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
toFree = info.getUsage().getUsed() - memoryThresholdSize + (long)(memoryThresholdSize * 0.5);
//log
String msg = "memory handler call- Usage threshold "
+ info.getUsage() + ", toFree = " + toFree;
if(!firstUsageThreshExceededLogged){
log.info("first " + msg);
firstUsageThreshExceededLogged = true;
}else{
log.debug(msg);
}
} else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
toFree = info.getUsage().getUsed() - collectionThresholdSize + (long)(collectionThresholdSize * 0.5);
//log
String msg = "memory handler call - Collection threshold "
+ info.getUsage() + ", toFree = " + toFree;
if(!firstCollectionThreshExceededLogged){
log.info("first " + msg);
firstCollectionThreshExceededLogged = true;
}else{
log.debug(msg);
}
}
if (toFree < 0) {
log.debug("low memory handler returning " +
"because there is nothing to free");
return;
}
// Use a separate spillLock to block multiple handleNotification calls
synchronized (spillLock) {
synchronized(spillables) {
spillablesSR = new LinkedList<SpillablePtr>();
for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) {
Spillable s = i.next().get();
if (s == null) {
i.remove();
continue;
}
// Create a list with spillable size for stable sorting. Refer PIG-4012
spillablesSR.add(new SpillablePtr(s, s.getMemorySize()));
}
log.debug("Spillables list size: " + spillablesSR.size());
Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
@Override
public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref) {
long o1Size = o1Ref.getMemorySize();
long o2Size = o2Ref.getMemorySize();
if (o1Size == o2Size) {
return 0;
}
if (o1Size < o2Size) {
return 1;
}
return -1;
}
});
// Block new bags from being registered
blockRegisterOnSpill = true;
}
try {
long estimatedFreed = 0;
int numObjSpilled = 0;
boolean invokeGC = false;
boolean extraGCCalled = false;
boolean isGroupingSpillable = false;
for (Iterator<SpillablePtr> i = spillablesSR.iterator(); i.hasNext();) {
SpillablePtr sPtr = i.next();
Spillable s = sPtr.get();
// Still need to check for null here, even after we removed
// above, because the reference may have gone bad on us
// since the last check.
if (s == null) {
i.remove();
continue;
}
long toBeFreed = sPtr.getMemorySize();
log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize);
// Don't keep trying if the rest of files are too small
if (toBeFreed < spillFileSizeThreshold) {
log.debug("spilling small files - getting out of memory handler");
break ;
}
isGroupingSpillable = (s instanceof GroupingSpillable);
// If single Spillable is bigger than the threshold,
// we force GC to make sure we really need to keep this
// object before paying for the expensive spill().
// Done at most once per handleNotification.
// Do not invoke extraGC for GroupingSpillable. Its size will always exceed
// extraGCSpillSizeThreshold and the data is always strong referenced.
if( !extraGCCalled && extraGCSpillSizeThreshold != 0
&& toBeFreed > extraGCSpillSizeThreshold && !isGroupingSpillable
&& n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
// this extra assignment to null is needed so that gc can free the
// spillable if nothing else is pointing at it
s = null;
System.gc();
extraGCCalled = true;
// checking again to see if this reference is still valid
s = sPtr.get();
if (s == null) {
i.remove();
accumulatedFreeSize = 0;
invokeGC = false;
continue;
}
}
// Unblock registering of new bags temporarily as aggregation
// of POPartialAgg requires new record to be loaded.
blockRegisterOnSpill = !isGroupingSpillable;
long numSpilled;
try {
numSpilled = s.spill();
} finally {
blockRegisterOnSpill = true;
}
if (numSpilled > 0) {
numObjSpilled++;
estimatedFreed += toBeFreed;
accumulatedFreeSize += toBeFreed;
}
// This should significantly reduce the number of small files
// in case that we have a lot of nested bags
if (accumulatedFreeSize > gcActivationSize) {
invokeGC = true;
}
if (estimatedFreed > toFree) {
log.debug("Freed enough space - getting out of memory handler");
invokeGC = true;
break;
}
}
spillablesSR = null;
/* Poke the GC again to see if we successfully freed enough memory */
if(invokeGC) {
System.gc();
// now that we have invoked the GC, reset accumulatedFreeSize
accumulatedFreeSize = 0;
}
if(estimatedFreed > 0){
String msg = "Spilled an estimate of " + estimatedFreed +
" bytes from " + numObjSpilled + " objects. " + info.getUsage();;
log.info(msg);
}
} finally {
blockRegisterOnSpill = false;
}
}
}