in modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java [640:815]
private long acquirePage(
int grpId,
long pageId,
IoStatisticsHolder statHolder,
boolean restore,
@Nullable AtomicBoolean pageAllocated
) throws IgniteInternalCheckedException {
assert started;
int partId = partitionId(pageId);
Segment seg = segment(grpId, pageId);
seg.readLock().lock();
try {
long relPtr = seg.loadedPages.get(
grpId,
effectivePageId(pageId),
seg.partGeneration(grpId, partId),
INVALID_REL_PTR,
INVALID_REL_PTR
);
// The page is loaded to the memory.
if (relPtr != INVALID_REL_PTR) {
long absPtr = seg.absolute(relPtr);
seg.acquirePage(absPtr);
seg.pageReplacementPolicy.onHit(relPtr);
statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
return absPtr;
}
} finally {
seg.readLock().unlock();
}
FullPageId fullId = new FullPageId(pageId, grpId);
seg.writeLock().lock();
long lockedPageAbsPtr = -1;
boolean readPageFromStore = false;
try {
// Double-check.
long relPtr = seg.loadedPages.get(
grpId,
fullId.effectivePageId(),
seg.partGeneration(grpId, partId),
INVALID_REL_PTR,
OUTDATED_REL_PTR
);
long absPtr;
if (relPtr == INVALID_REL_PTR) {
relPtr = seg.borrowOrAllocateFreePage(pageId);
if (pageAllocated != null) {
pageAllocated.set(true);
}
if (relPtr == INVALID_REL_PTR) {
relPtr = seg.removePageForReplacement();
}
absPtr = seg.absolute(relPtr);
fullPageId(absPtr, fullId);
writeTimestamp(absPtr, coarseCurrentTimeMillis());
assert !isAcquired(absPtr) :
"Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr) + ", absPtr=" + hexLong(absPtr) + ']';
// We can clear dirty flag after the page has been allocated.
setDirty(fullId, absPtr, false, false);
seg.pageReplacementPolicy.onMiss(relPtr);
seg.loadedPages.put(
grpId,
fullId.effectivePageId(),
relPtr,
seg.partGeneration(grpId, partId)
);
long pageAddr = absPtr + PAGE_OVERHEAD;
if (!restore) {
delayedPageReplacementTracker.waitUnlock(fullId);
readPageFromStore = true;
} else {
zeroMemory(absPtr + PAGE_OVERHEAD, pageSize());
// Must init page ID in order to ensure RWLock tag consistency.
setPageId(pageAddr, pageId);
}
rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
if (readPageFromStore) {
boolean locked = rwLock.writeLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
assert locked : "Page ID " + fullId + " expected to be locked";
lockedPageAbsPtr = absPtr;
}
} else if (relPtr == OUTDATED_REL_PTR) {
assert pageIndex(pageId) == 0 : fullId;
relPtr = seg.refreshOutdatedPage(grpId, pageId, false);
absPtr = seg.absolute(relPtr);
long pageAddr = absPtr + PAGE_OVERHEAD;
zeroMemory(pageAddr, pageSize());
fullPageId(absPtr, fullId);
writeTimestamp(absPtr, coarseCurrentTimeMillis());
setPageId(pageAddr, pageId);
assert !isAcquired(absPtr) :
"Pin counter must be 0 for a new page [relPtr=" + hexLong(relPtr) + ", absPtr=" + hexLong(absPtr) + ']';
rwLock.init(absPtr + PAGE_LOCK_OFFSET, tag(pageId));
seg.pageReplacementPolicy.onRemove(relPtr);
seg.pageReplacementPolicy.onMiss(relPtr);
} else {
absPtr = seg.absolute(relPtr);
seg.pageReplacementPolicy.onHit(relPtr);
}
seg.acquirePage(absPtr);
if (!readPageFromStore) {
statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
}
return absPtr;
} finally {
seg.writeLock().unlock();
delayedPageReplacementTracker.delayedPageWrite().finishReplacement();
if (readPageFromStore) {
assert lockedPageAbsPtr != -1 : "Page is expected to have a valid address [pageId=" + fullId
+ ", lockedPageAbsPtr=" + hexLong(lockedPageAbsPtr) + ']';
assert isPageWriteLocked(lockedPageAbsPtr) : "Page is expected to be locked: [pageId=" + fullId + "]";
long pageAddr = lockedPageAbsPtr + PAGE_OVERHEAD;
ByteBuffer buf = wrapPointer(pageAddr, pageSize());
long actualPageId = 0;
try {
pageStoreManager.read(grpId, pageId, buf, false);
statHolder.trackPhysicalAndLogicalRead(pageAddr);
actualPageId = getPageId(buf);
} finally {
rwLock.writeUnlock(lockedPageAbsPtr + PAGE_LOCK_OFFSET, actualPageId == 0 ? TAG_LOCK_ALWAYS : tag(actualPageId));
}
}
}
}