in hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java [983:1090]
public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
while (true) {
int startCleanedCount = cleanerThread.cleanedCount;
ICachedPage returnPage = null;
CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
if (victim != null) {
if(DEBUG) {
assert !victim.confiscated.get();
}
// find a page that would possibly be evicted anyway
// Case 1 from findPage()
if (victim.dpid < 0) { // new page
if (victim.pinCount.get() != 1) {
victim.pinCount.decrementAndGet();
continue;
}
returnPage = victim;
((CachedPage) returnPage).dpid = dpid;
} else {
// Case 2a/b
int pageHash = hash(victim.getDiskPageId());
CacheBucket bucket = pageMap[pageHash];
bucket.bucketLock.lock();
try {
// readjust the next pointers to remove this page from
// the pagemap
CachedPage curr = bucket.cachedPage;
CachedPage prev = null;
boolean found = false;
//traverse the bucket's linked list to find the victim.
while (curr != null) {
if (curr == victim) { // we found where the victim
// resides in the hash table
if (victim.pinCount.get() != 1) {
victim.pinCount.decrementAndGet();
break;
}
// if this is the first page in the bucket
if (prev == null) {
if(DEBUG) {
assert curr != curr.next;
}
bucket.cachedPage = bucket.cachedPage.next;
found = true;
break;
// if it isn't we need to make the previous
// node point to where it should
} else {
if(DEBUG) {
assert curr.next != curr;
}
prev.next = curr.next;
curr.next = null;
if(DEBUG) {
assert prev.next != prev;
}
found = true;
break;
}
}
// go to the next entry
prev = curr;
curr = curr.next;
}
if (found) {
returnPage = victim;
((CachedPage) returnPage).dpid = dpid;
} //otherwise, someone took the same victim before we acquired the lock. try again!
} finally {
bucket.bucketLock.unlock();
}
}
}
// if we found a page after all that, go ahead and finish
if (returnPage != null) {
((CachedPage) returnPage).confiscated.set(true);
if (DEBUG) {
confiscateLock.lock();
try{
confiscatedPages.add((CachedPage) returnPage);
confiscatedPagesOwner.put((CachedPage) returnPage, Thread.currentThread().getStackTrace());
}
finally{
confiscateLock.unlock();
}
}
return returnPage;
}
// no page available to confiscate. try kicking the cleaner thread.
synchronized (cleanerThread) {
pageCleanerPolicy.notifyVictimNotFound(cleanerThread);
}
// Heuristic optimization. Check whether the cleaner thread has
// cleaned pages since we did our last pin attempt.
if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
// Don't go to sleep and wait for notification from the cleaner,
// just try to pin again immediately.
continue;
}
synchronized (cleanerThread.cleanNotification) {
try {
cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
} catch (InterruptedException e) {
// Do nothing
}
}
}
}