in hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java [215:438]
private CachedPage findPage(long dpid, boolean virtual) throws HyracksDataException {
while (true) {
int startCleanedCount = cleanerThread.cleanedCount;
CachedPage cPage = null;
/*
* Hash dpid to get a bucket and then check if the page exists in
* the bucket.
*/
int hash = hash(dpid);
CacheBucket bucket = pageMap[hash];
bucket.bucketLock.lock();
try {
cPage = bucket.cachedPage;
while (cPage != null) {
if(DEBUG) {
assert bucket.cachedPage != bucket.cachedPage.next;
}
if (cPage.dpid == dpid) {
if(DEBUG) {
assert !cPage.confiscated.get();
}
cPage.pinCount.incrementAndGet();
return cPage;
}
cPage = cPage.next;
}
} finally {
bucket.bucketLock.unlock();
}
/*
* If we got here, the page was not in the hash table. Now we ask
* the page replacement strategy to find us a victim.
*/
CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
if (victim != null) {
/*
* We have a victim with the following invariants. 1. The dpid
* on the CachedPage may or may not be valid. 2. We have a pin
* on the CachedPage. We have to deal with three cases here.
* Case 1: The dpid on the CachedPage is invalid (-1). This
* indicates that this buffer has never been used or is a
* confiscated page. So we are the only ones holding it. Get a lock
* on the required dpid's hash bucket, check if someone inserted
* the page we want into the table. If so, decrement the
* pincount on the victim and return the winner page in the
* table. If such a winner does not exist, insert the victim and
* return it. Case 2: The dpid on the CachedPage is valid. Case
* 2a: The current dpid and required dpid hash to the same
* bucket. Get the bucket lock, check that the victim is still
* at pinCount == 1 If so check if there is a winning CachedPage
* with the required dpid. If so, decrement the pinCount on the
* victim and return the winner. If not, update the contents of
* the CachedPage to hold the required dpid and return it. If
* the picCount on the victim was != 1 or CachedPage was dirty
* someone used the victim for its old contents -- Decrement the
* pinCount and retry. Case 2b: The current dpid and required
* dpid hash to different buckets. Get the two bucket locks in
* the order of the bucket indexes (Ordering prevents
* deadlocks). Check for the existence of a winner in the new
* bucket and for potential use of the victim (pinCount != 1).
* If everything looks good, remove the CachedPage from the old
* bucket, and add it to the new bucket and update its header
* with the new dpid.
*/
if (victim.dpid < 0) {
/*
* Case 1.
*/
bucket.bucketLock.lock();
try {
if (DEBUG) {
confiscateLock.lock();
try{
if (confiscatedPages.contains(victim)) {
throw new IllegalStateException();
}
} finally{
confiscateLock.unlock();
}
}
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
cPage.pinCount.incrementAndGet();
victim.pinCount.decrementAndGet();
if(DEBUG) {
assert !cPage.confiscated.get();
}
return cPage;
}
cPage = cPage.next;
}
victim.reset(dpid);
victim.next = bucket.cachedPage;
bucket.cachedPage = victim;
} finally {
bucket.bucketLock.unlock();
}
if(DEBUG) {
assert !victim.confiscated.get();
}
return victim;
}
int victimHash = hash(victim.dpid);
if (victimHash == hash) {
/*
* Case 2a.
*/
bucket.bucketLock.lock();
try {
if (victim.pinCount.get() != 1) {
victim.pinCount.decrementAndGet();
continue;
}
if (DEBUG) {
confiscateLock.lock();
try{
if (confiscatedPages.contains(victim)) {
throw new IllegalStateException();
}
}finally{
confiscateLock.unlock();
}
}
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
cPage.pinCount.incrementAndGet();
victim.pinCount.decrementAndGet();
if(DEBUG) {
assert !victim.confiscated.get();
}
return cPage;
}
cPage = cPage.next;
}
victim.reset(dpid);
} finally {
bucket.bucketLock.unlock();
}
if(DEBUG) {
assert !victim.confiscated.get();
}
return victim;
} else {
/*
* Case 2b.
*/
CacheBucket victimBucket = pageMap[victimHash];
if (victimHash < hash) {
victimBucket.bucketLock.lock();
bucket.bucketLock.lock();
} else {
bucket.bucketLock.lock();
victimBucket.bucketLock.lock();
}
try {
if (victim.pinCount.get() != 1) {
victim.pinCount.decrementAndGet();
continue;
}
if (DEBUG) {
if (confiscatedPages.contains(victim)) {
throw new IllegalStateException();
}
}
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
cPage.pinCount.incrementAndGet();
victim.pinCount.decrementAndGet();
if(DEBUG) {
assert !cPage.confiscated.get();
}
return cPage;
}
cPage = cPage.next;
}
if (victimBucket.cachedPage == victim) {
victimBucket.cachedPage = victim.next;
} else {
CachedPage victimPrev = victimBucket.cachedPage;
while (victimPrev != null && victimPrev.next != victim) {
victimPrev = victimPrev.next;
}
if(DEBUG) {
assert victimPrev != null;
}
victimPrev.next = victim.next;
}
victim.reset(dpid);
victim.next = bucket.cachedPage;
bucket.cachedPage = victim;
} finally {
victimBucket.bucketLock.unlock();
bucket.bucketLock.unlock();
}
if(DEBUG) {
assert !victim.confiscated.get();
}
return victim;
}
}
synchronized (cleanerThread) {
pageCleanerPolicy.notifyVictimNotFound(cleanerThread);
}
// Heuristic optimization. Check whether the cleaner thread has
// cleaned pages since we did our last pin attempt.
if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
// Don't go to sleep and wait for notification from the cleaner,
// just try to pin again immediately.
continue;
}
synchronized (cleanerThread.cleanNotification) {
try {
cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
} catch (InterruptedException e) {
// Do nothing
}
}
}
}