in src/core/persistent_memory_malloc.h [717:790]
Status PersistentMemoryMalloc<D>::AsyncFlushPages(uint32_t start_page, Address until_address,
bool serialize_objects) {
class Context : public IAsyncContext {
public:
Context(alloc_t* allocator_, uint32_t page_, Address until_address_)
: allocator{ allocator_ }
, page{ page_ }
, until_address{ until_address_ } {
}
/// The deep-copy constructor
Context(const Context& other)
: allocator{ other.allocator }
, page{ other.page }
, until_address{ other.until_address } {
}
protected:
Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
return IAsyncContext::DeepCopy_Internal(*this, context_copy);
}
public:
alloc_t* allocator;
uint32_t page;
Address until_address;
};
auto callback = [](IAsyncContext* ctxt, Status result, size_t bytes_transferred) {
CallbackContext<Context> context{ ctxt };
if(result != Status::Ok) {
fprintf(stderr, "AsyncFlushPages(), error: %u\n", static_cast<uint8_t>(result));
}
context->allocator->PageStatus(context->page).LastFlushedUntilAddress.store(
context->until_address);
//Set the page status to flushed
FlushCloseStatus old_status = context->allocator->PageStatus(context->page).status.load();
FlushCloseStatus new_status;
do {
new_status = FlushCloseStatus{ FlushStatus::Flushed, old_status.close };
} while(!context->allocator->PageStatus(context->page).status.compare_exchange_weak(
old_status, new_status));
if(old_status.close == CloseStatus::Closed) {
// We finished flushing the page after it was closed, so we are responsible for clearing and
// reopening it.
std::memset(context->allocator->Page(context->page), 0, kPageSize);
context->allocator->PageStatus(context->page).status.store(FlushStatus::Flushed,
CloseStatus::Open);
}
context->allocator->ShiftFlushedUntilAddress();
};
uint32_t num_pages = until_address.page() - start_page;
if(until_address.offset() > 0) {
++num_pages;
}
assert(num_pages > 0);
for(uint32_t flush_page = start_page; flush_page < start_page + num_pages; ++flush_page) {
Address page_start_address{ flush_page, 0 };
Address page_end_address{ flush_page + 1, 0 };
Context context{ this, flush_page, std::min(page_end_address, until_address) };
//Set status to in-progress
FlushCloseStatus old_status = PageStatus(flush_page).status.load();
FlushCloseStatus new_status;
do {
new_status = FlushCloseStatus{ FlushStatus::InProgress, old_status.close };
} while(!PageStatus(flush_page).status.compare_exchange_weak(old_status, new_status));
PageStatus(flush_page).LastFlushedUntilAddress.store(0);
RETURN_NOT_OK(file->WriteAsync(Page(flush_page), kPageSize * flush_page, kPageSize, callback,
context));
}
return Status::Ok;
}