private CachedPage findPage()

in hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java [215:438]


    private CachedPage findPage(long dpid, boolean virtual) throws HyracksDataException {
        while (true) {
            int startCleanedCount = cleanerThread.cleanedCount;

            CachedPage cPage = null;
            /*
             * Hash dpid to get a bucket and then check if the page exists in
             * the bucket.
             */
            int hash = hash(dpid);
            CacheBucket bucket = pageMap[hash];
            bucket.bucketLock.lock();
            try {
                cPage = bucket.cachedPage;
                while (cPage != null) {
                    if(DEBUG) {
                        assert bucket.cachedPage != bucket.cachedPage.next;
                    }
                    if (cPage.dpid == dpid) {
                        if(DEBUG) {
                            assert !cPage.confiscated.get();
                        }
                        cPage.pinCount.incrementAndGet();
                        return cPage;
                    }
                    cPage = cPage.next;
                }
            } finally {
                bucket.bucketLock.unlock();
            }
            /*
             * If we got here, the page was not in the hash table. Now we ask
             * the page replacement strategy to find us a victim.
             */
            CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
            if (victim != null) {
                /*
                 * We have a victim with the following invariants. 1. The dpid
                 * on the CachedPage may or may not be valid. 2. We have a pin
                 * on the CachedPage. We have to deal with three cases here.
                 * Case 1: The dpid on the CachedPage is invalid (-1). This
                 * indicates that this buffer has never been used or is a
                 * confiscated page. So we are the only ones holding it. Get a lock
                 * on the required dpid's hash bucket, check if someone inserted
                 * the page we want into the table. If so, decrement the
                 * pincount on the victim and return the winner page in the
                 * table. If such a winner does not exist, insert the victim and
                 * return it. Case 2: The dpid on the CachedPage is valid. Case
                 * 2a: The current dpid and required dpid hash to the same
                 * bucket. Get the bucket lock, check that the victim is still
                 * at pinCount == 1 If so check if there is a winning CachedPage
                 * with the required dpid. If so, decrement the pinCount on the
                 * victim and return the winner. If not, update the contents of
                 * the CachedPage to hold the required dpid and return it. If
                 * the picCount on the victim was != 1 or CachedPage was dirty
                 * someone used the victim for its old contents -- Decrement the
                 * pinCount and retry. Case 2b: The current dpid and required
                 * dpid hash to different buckets. Get the two bucket locks in
                 * the order of the bucket indexes (Ordering prevents
                 * deadlocks). Check for the existence of a winner in the new
                 * bucket and for potential use of the victim (pinCount != 1).
                 * If everything looks good, remove the CachedPage from the old
                 * bucket, and add it to the new bucket and update its header
                 * with the new dpid.
                 */
                if (victim.dpid < 0) {
                    /*
                     * Case 1.
                     */
                    bucket.bucketLock.lock();
                    try {
                        if (DEBUG) {
                            confiscateLock.lock();
                            try{
                                if (confiscatedPages.contains(victim)) {
                                    throw new IllegalStateException();
                                }
                            } finally{
                                confiscateLock.unlock();
                            }
                        }
                        cPage = bucket.cachedPage;
                        while (cPage != null) {
                            if (cPage.dpid == dpid) {
                                cPage.pinCount.incrementAndGet();
                                victim.pinCount.decrementAndGet();
                                if(DEBUG) {
                                    assert !cPage.confiscated.get();
                                }
                                return cPage;
                            }
                            cPage = cPage.next;
                        }
                        victim.reset(dpid);
                        victim.next = bucket.cachedPage;
                        bucket.cachedPage = victim;
                    } finally {
                        bucket.bucketLock.unlock();
                    }

                    if(DEBUG) {
                        assert !victim.confiscated.get();
                    }
                    return victim;
                }
                int victimHash = hash(victim.dpid);
                if (victimHash == hash) {
                    /*
                     * Case 2a.
                     */
                    bucket.bucketLock.lock();
                    try {
                        if (victim.pinCount.get() != 1) {
                            victim.pinCount.decrementAndGet();
                            continue;
                        }
                        if (DEBUG) {
                            confiscateLock.lock();
                            try{
                                if (confiscatedPages.contains(victim)) {
                                    throw new IllegalStateException();
                                }
                            }finally{
                                confiscateLock.unlock();
                            }
                        }
                        cPage = bucket.cachedPage;
                        while (cPage != null) {
                            if (cPage.dpid == dpid) {
                                cPage.pinCount.incrementAndGet();
                                victim.pinCount.decrementAndGet();
                                if(DEBUG) {
                                    assert !victim.confiscated.get();
                                }
                                return cPage;
                            }
                            cPage = cPage.next;
                        }
                        victim.reset(dpid);
                    } finally {
                        bucket.bucketLock.unlock();
                    }
                    if(DEBUG) {
                        assert !victim.confiscated.get();
                    }
                    return victim;
                } else {
                    /*
                     * Case 2b.
                     */
                    CacheBucket victimBucket = pageMap[victimHash];
                    if (victimHash < hash) {
                        victimBucket.bucketLock.lock();
                        bucket.bucketLock.lock();
                    } else {
                        bucket.bucketLock.lock();
                        victimBucket.bucketLock.lock();
                    }
                    try {
                        if (victim.pinCount.get() != 1) {
                            victim.pinCount.decrementAndGet();
                            continue;
                        }
                        if (DEBUG) {
                            if (confiscatedPages.contains(victim)) {
                                throw new IllegalStateException();
                            }
                        }
                        cPage = bucket.cachedPage;
                        while (cPage != null) {
                            if (cPage.dpid == dpid) {
                                cPage.pinCount.incrementAndGet();
                                victim.pinCount.decrementAndGet();
                                if(DEBUG) {
                                    assert !cPage.confiscated.get();
                                }
                                return cPage;
                            }
                            cPage = cPage.next;
                        }
                        if (victimBucket.cachedPage == victim) {
                            victimBucket.cachedPage = victim.next;
                        } else {
                            CachedPage victimPrev = victimBucket.cachedPage;
                            while (victimPrev != null && victimPrev.next != victim) {
                                victimPrev = victimPrev.next;
                            }
                            if(DEBUG) {
                                assert victimPrev != null;
                            }
                            victimPrev.next = victim.next;
                        }
                        victim.reset(dpid);
                        victim.next = bucket.cachedPage;
                        bucket.cachedPage = victim;
                    } finally {
                        victimBucket.bucketLock.unlock();
                        bucket.bucketLock.unlock();
                    }
                    if(DEBUG) {
                        assert !victim.confiscated.get();
                    }
                    return victim;
                }
            }
            synchronized (cleanerThread) {
                pageCleanerPolicy.notifyVictimNotFound(cleanerThread);
            }
            // Heuristic optimization. Check whether the cleaner thread has
            // cleaned pages since we did our last pin attempt.
            if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
                // Don't go to sleep and wait for notification from the cleaner,
                // just try to pin again immediately.
                continue;
            }
            synchronized (cleanerThread.cleanNotification) {
                try {
                    cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
                } catch (InterruptedException e) {
                    // Do nothing
                }
            }
        }
    }